Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "analytic_sink_operator.h"

#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -280,13 +281,29 @@ bool AnalyticSinkLocalState::_get_next_for_partition(int64_t current_block_rows,

bool AnalyticSinkLocalState::_get_next_for_unbounded_range(int64_t current_block_rows,
int64_t current_block_base_pos) {
auto& p = _parent->cast<AnalyticSinkOperatorX>();
_update_order_by_range();
if (p._partition_exprs.size() == 1) {
auto dbg_str = apache::thrift::ThriftDebugString(p._partition_exprs[0]);
if (dbg_str.find("category_ids") != std::string::npos) {
LOG(WARNING) << "xxxx _execute_impl _order_by_pose, is_ended: "
<< _order_by_pose.is_ended << ", start: " << _order_by_pose.start
<< ", end: " << _order_by_pose.end;
}
}
if (!_order_by_pose.is_ended) {
DCHECK(!_partition_by_pose.is_ended);
_need_more_data = true;
return false;
}
while (_current_row_position < _order_by_pose.end) {
if (p._partition_exprs.size() == 1) {
auto dbg_str = apache::thrift::ThriftDebugString(p._partition_exprs[0]);
if (dbg_str.find("category_ids") != std::string::npos) {
LOG(WARNING) << "xxxx _execute_impl _current_row_position : "
<< _current_row_position;
}
}
if (_current_row_position == _order_by_pose.start) {
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
_order_by_pose.start, _order_by_pose.end);
Expand Down Expand Up @@ -342,9 +359,19 @@ bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t current_block_r
}

Status AnalyticSinkLocalState::_execute_impl() {
auto& p = _parent->cast<AnalyticSinkOperatorX>();
while (_output_block_index < _input_blocks.size()) {
{
_get_partition_by_end();
if (p._partition_exprs.size() == 1) {
auto dbg_str = apache::thrift::ThriftDebugString(p._partition_exprs[0]);
if (dbg_str.find("category_ids") != std::string::npos) {
LOG(WARNING) << "xxxx _execute_impl _partition_by_pose, is_ended: "
<< _partition_by_pose.is_ended
<< ", start: " << _partition_by_pose.start
<< ", end: " << _partition_by_pose.end;
}
}
// streaming_mode means no need get all parition data, could calculate data when it's arrived
if (!_partition_by_pose.is_ended && (!_streaming_mode || _need_more_data)) {
_need_more_data = false;
Expand Down Expand Up @@ -649,7 +676,19 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
_has_window(tnode.analytic_node.__isset.window),
_has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE),
_has_window_start(tnode.analytic_node.window.__isset.window_start),
_has_window_end(tnode.analytic_node.window.__isset.window_end) {}
_has_window_end(tnode.analytic_node.window.__isset.window_end) {
if (_partition_exprs.size() == 1) {
auto dbg_str = apache::thrift::ThriftDebugString(_partition_exprs[0]);
if (dbg_str.find("category_ids") != std::string::npos) {
LOG(WARNING) << "xxxx AnalyticSinkOperatorX:: _is_colocate: " << _is_colocate
<< ", _require_bucket_distribution: " << _require_bucket_distribution
<< ", _has_window: " << _has_window
<< ", _has_range_window: " << _has_range_window
<< ", _has_window_start: " << _has_window_start
<< ", _has_window_end: " << _has_window_end;
}
}
}

Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down Expand Up @@ -779,6 +818,12 @@ Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
if (input_block->rows() <= 0) {
return Status::OK();
}
if (_partition_exprs.size() == 1) {
auto dbg_str = apache::thrift::ThriftDebugString(_partition_exprs[0]);
if (dbg_str.find("category_ids") != std::string::npos) {
LOG(WARNING) << "xxxx AnalyticSinkOperatorX::sink block: " << input_block->dump_data();
}
}
auto& local_state = get_local_state(state);
local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows);
size_t block_rows = input_block->rows();
Expand Down
Loading
Loading