blob: 7eff2c62cb62defeb7cf5a2813f3e0b087a24c9e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "analytic_sink_operator.h"
#include <glog/logging.h>
#include <cstddef>
#include <cstdint>
#include <string>
#include "pipeline/exec/operator.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vectorized_agg_fn.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_evaluation_timer = ADD_TIMER(custom_profile(), "EvaluationTime");
_compute_agg_data_timer = ADD_TIMER(custom_profile(), "ComputeAggDataTime");
_compute_partition_by_timer = ADD_TIMER(custom_profile(), "ComputePartitionByTime");
_compute_order_by_timer = ADD_TIMER(custom_profile(), "ComputeOrderByTime");
_compute_range_between_function_timer = ADD_TIMER(custom_profile(), "ComputeRangeBetweenTime");
_partition_search_timer = ADD_TIMER(custom_profile(), "PartitionSearchTime");
_order_search_timer = ADD_TIMER(custom_profile(), "OrderSearchTime");
_remove_rows_timer = ADD_TIMER(custom_profile(), "RemoveRowsTime");
_remove_rows = ADD_COUNTER(custom_profile(), "RemoveRows", TUnit::UNIT);
_remove_count = ADD_COUNTER(custom_profile(), "RemoveCount", TUnit::UNIT);
_blocks_memory_usage =
common_profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1);
auto& p = _parent->cast<AnalyticSinkOperatorX>();
if (!p._has_window || (!p._has_window_start && !p._has_window_end)) {
// haven't set window, Unbounded: [unbounded preceding,unbounded following]
// For window frame `ROWS|RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_partition;
} else if (p._has_range_window) {
if (!p._has_window_start &&
p._window.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW) {
// For window frame `RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_unbounded_range;
_streaming_mode = true;
} else {
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_range_between;
}
} else {
if (!p._has_window_start) {
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_unbounded_rows;
} else {
_executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_sliding_rows;
}
_streaming_mode = true;
_support_incremental_calculate = (p._has_window_start && p._has_window_end);
// TAnalyticWindowBoundaryType::PRECEDING -> negative
// TAnalyticWindowBoundaryType::CURRENT_ROW -> set zero
// TAnalyticWindowBoundaryType::FOLLOWING -> positive
if (p._has_window_start) { //calculate start boundary
TAnalyticWindowBoundary b = p._window.window_start;
if (b.__isset.rows_offset_value) { //[offset , ]
_rows_start_offset = b.rows_offset_value;
if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
_rows_start_offset *= -1;
}
} else {
DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row, ]
_rows_start_offset = 0;
}
}
if (p._has_window_end) { //calculate end boundary
TAnalyticWindowBoundary b = p._window.window_end;
if (b.__isset.rows_offset_value) { //[ , offset]
_rows_end_offset = b.rows_offset_value;
if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
_rows_end_offset *= -1;
}
} else {
DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); //[ ,current row]
_rows_end_offset = 0;
}
}
}
custom_profile()->add_info_string("streaming mode: ", std::to_string(_streaming_mode));
return Status::OK();
}
Status AnalyticSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::open(state));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<AnalyticSinkOperatorX>();
_agg_functions_size = p._agg_functions_size;
_agg_expr_ctxs.resize(_agg_functions_size);
_agg_functions.resize(_agg_functions_size);
_agg_input_columns.resize(_agg_functions_size);
_offsets_of_aggregate_states.resize(_agg_functions_size);
_result_column_nullable_flags.resize(_agg_functions_size);
_result_column_could_resize.resize(_agg_functions_size);
_use_null_result.resize(_agg_functions_size, 0);
_could_use_previous_result.resize(_agg_functions_size, 0);
for (int i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool());
_agg_input_columns[i].resize(p._num_agg_input[i]);
_agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, _agg_expr_ctxs[i][j]));
_agg_input_columns[i][j] = _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
}
_offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
_result_column_nullable_flags[i] =
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
_agg_functions[i]->data_type()->is_nullable();
_result_column_could_resize[i] =
_agg_functions[i]->function()->result_column_could_resize();
if (PARTITION_FUNCTION_SET.contains(_agg_functions[i]->function()->get_name())) {
_streaming_mode = false;
}
_support_incremental_calculate &=
_agg_functions[i]->function()->supported_incremental_mode();
}
_partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
_partition_by_eq_expr_ctxs.resize(_partition_exprs_size);
_partition_by_columns.resize(_partition_exprs_size);
for (size_t i = 0; i < _partition_exprs_size; i++) {
RETURN_IF_ERROR(
p._partition_by_eq_expr_ctxs[i]->clone(state, _partition_by_eq_expr_ctxs[i]));
_partition_by_columns[i] =
_partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
}
_order_by_exprs_size = p._order_by_eq_expr_ctxs.size();
_order_by_eq_expr_ctxs.resize(_order_by_exprs_size);
_order_by_columns.resize(_order_by_exprs_size);
for (size_t i = 0; i < _order_by_exprs_size; i++) {
RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, _order_by_eq_expr_ctxs[i]));
_order_by_columns[i] = _order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
}
// only support one order by column, so need two columns upper and lower bound
_range_between_expr_ctxs.resize(p._range_between_expr_ctxs.size());
_range_result_columns.resize(_range_between_expr_ctxs.size());
for (size_t i = 0; i < _range_between_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._range_between_expr_ctxs[i]->clone(state, _range_between_expr_ctxs[i]));
_range_result_columns[i] =
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
}
_fn_place_ptr = _agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
p._align_aggregate_states);
_create_agg_status();
return Status::OK();
}
Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
_destroy_agg_status();
_fn_place_ptr = nullptr;
_result_window_columns.clear();
_agg_input_columns.clear();
_partition_by_columns.clear();
_order_by_columns.clear();
_range_result_columns.clear();
return PipelineXSinkLocalState<AnalyticSharedState>::close(state, exec_status);
}
bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t current_block_rows,
int64_t current_block_base_pos) {
const bool is_n_following_frame = _rows_end_offset > 0;
while (_current_row_position < _partition_by_pose.end) {
int64_t current_row_start = _current_row_position + _rows_start_offset;
int64_t current_row_end = _current_row_position + _rows_end_offset + 1;
if (is_n_following_frame && !_partition_by_pose.is_ended &&
current_row_end > _partition_by_pose.end) {
_need_more_data = true;
break;
}
if (_support_incremental_calculate) {
_execute_for_function<true>(_partition_by_pose.start, _partition_by_pose.end,
current_row_start, current_row_end);
} else {
_reset_agg_status();
// Eg: rows between unbounded preceding and 10 preceding
// Make sure range_start <= range_end
current_row_start = std::min(current_row_start, current_row_end);
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
current_row_start, current_row_end);
}
int64_t pos = current_pos_in_block();
_insert_result_info(pos, pos + 1);
_current_row_position++;
// means the current row is the last row in the block, could output the block
if (_current_row_position - current_block_base_pos >= current_block_rows) {
return true;
}
}
return false;
}
bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t current_block_rows,
int64_t current_block_base_pos) {
const bool is_n_following_frame = _rows_end_offset > 0;
while (_current_row_position < _partition_by_pose.end) {
int64_t current_row_end = _current_row_position + _rows_end_offset + 1;
// [preceding, current_row], [current_row, following] rewrite it's same
// as could reuse the previous calculate result, so don't call _reset_agg_status function
// going on calculate, add up data, no need to reset state
if (is_n_following_frame && !_partition_by_pose.is_ended &&
current_row_end > _partition_by_pose.end) {
_need_more_data = true;
break;
}
if (is_n_following_frame && _current_row_position == _partition_by_pose.start) {
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
_partition_by_pose.start, current_row_end - 1);
}
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end, current_row_end - 1,
current_row_end);
int64_t pos = current_pos_in_block();
_insert_result_info(pos, pos + 1);
_current_row_position++;
// means the current row is the last row in the block, could output the block
if (_current_row_position - current_block_base_pos >= current_block_rows) {
return true;
}
}
return false;
}
bool AnalyticSinkLocalState::_get_next_for_partition(int64_t current_block_rows,
int64_t current_block_base_pos) {
if (_current_row_position == _partition_by_pose.start) {
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
_partition_by_pose.start, _partition_by_pose.end);
}
// the end pos maybe after multis blocks, but should output by batch size and should not exceed partition end
auto window_end_pos = _current_row_position + current_block_rows;
window_end_pos = std::min<int64_t>(window_end_pos, _partition_by_pose.end);
auto previous_window_frame_width = _current_row_position - current_block_base_pos;
auto current_window_frame_width = window_end_pos - current_block_base_pos;
// should not exceed block batch size
current_window_frame_width = std::min<int64_t>(current_window_frame_width, current_block_rows);
auto real_deal_with_width = current_window_frame_width - previous_window_frame_width;
int64_t pos = current_pos_in_block();
_insert_result_info(pos, pos + real_deal_with_width);
_current_row_position += real_deal_with_width;
return _current_row_position - current_block_base_pos >= current_block_rows;
}
bool AnalyticSinkLocalState::_get_next_for_unbounded_range(int64_t current_block_rows,
int64_t current_block_base_pos) {
_update_order_by_range();
if (!_order_by_pose.is_ended) {
DCHECK(!_partition_by_pose.is_ended);
_need_more_data = true;
return false;
}
while (_current_row_position < _order_by_pose.end) {
if (_current_row_position == _order_by_pose.start) {
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
_order_by_pose.start, _order_by_pose.end);
}
auto previous_window_frame_width = _current_row_position - current_block_base_pos;
auto current_window_frame_width = _order_by_pose.end - current_block_base_pos;
current_window_frame_width =
std::min<int64_t>(current_window_frame_width, current_block_rows);
auto real_deal_with_width = current_window_frame_width - previous_window_frame_width;
int64_t pos = current_pos_in_block();
_insert_result_info(pos, pos + real_deal_with_width);
_current_row_position += real_deal_with_width;
if (_current_row_position - current_block_base_pos >= current_block_rows) {
return true;
}
}
return false;
}
bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t current_block_rows,
int64_t current_block_base_pos) {
while (_current_row_position < _partition_by_pose.end) {
_reset_agg_status();
if (!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) {
_order_by_pose.start = _partition_by_pose.start;
} else {
_order_by_pose.start = find_first_not_equal(
_range_result_columns[0].get(), _order_by_columns[0].get(),
_current_row_position, _order_by_pose.start, _partition_by_pose.end);
}
if (!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_end) {
_order_by_pose.end = _partition_by_pose.end;
} else {
_order_by_pose.end = find_first_not_equal(
_range_result_columns[1].get(), _order_by_columns[0].get(),
_current_row_position, _order_by_pose.end, _partition_by_pose.end);
}
_execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
_order_by_pose.start, _order_by_pose.end);
int64_t pos = current_pos_in_block();
_insert_result_info(pos, pos + 1);
_current_row_position++;
if (_current_row_position - current_block_base_pos >= current_block_rows) {
return true;
}
}
if (_current_row_position == _partition_by_pose.end) {
_order_by_pose.start = _partition_by_pose.end; // update to next partition pos
_order_by_pose.end = _partition_by_pose.end;
}
return false;
}
Status AnalyticSinkLocalState::_execute_impl() {
while (_output_block_index < _input_blocks.size()) {
{
_get_partition_by_end();
// streaming_mode means no need get all parition data, could calculate data when it's arrived
if (!_partition_by_pose.is_ended && (!_streaming_mode || _need_more_data)) {
_need_more_data = false;
break;
}
_init_result_columns();
auto current_block_rows = _input_blocks[_output_block_index].rows();
auto current_block_base_pos =
_input_block_first_row_positions[_output_block_index] - _have_removed_rows;
bool should_output = false;
{
SCOPED_TIMER(_evaluation_timer);
should_output = (this->*_executor.get_next_impl)(current_block_rows,
current_block_base_pos);
}
if (should_output) {
vectorized::Block block;
_output_current_block(&block);
_refresh_buffer_and_dependency_state(&block);
}
if (_current_row_position == _partition_by_pose.end && _partition_by_pose.is_ended) {
_reset_state_for_next_partition();
}
}
}
return Status::OK();
}
template <bool incremental>
void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int64_t partition_end,
int64_t frame_start, int64_t frame_end) {
// here is the core function, should not add timer
for (size_t i = 0; i < _agg_functions_size; ++i) {
std::vector<const vectorized::IColumn*> agg_columns;
for (int j = 0; j < _agg_input_columns[i].size(); ++j) {
agg_columns.push_back(_agg_input_columns[i][j].get());
}
if constexpr (incremental) {
_agg_functions[i]->function()->execute_function_with_incremental(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(),
_agg_arena_pool, false, false, false, &_use_null_result[i],
&_could_use_previous_result[i]);
} else {
_agg_functions[i]->function()->add_range_single_place(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(),
_agg_arena_pool, &(_use_null_result[i]), &_could_use_previous_result[i]);
}
}
}
void AnalyticSinkLocalState::_insert_result_info(int64_t start, int64_t end) {
// here is the core function, should not add timer
for (size_t i = 0; i < _agg_functions_size; ++i) {
if (_result_column_nullable_flags[i]) {
if (_use_null_result[i]) {
_result_window_columns[i]->insert_many_defaults(end - start);
} else {
auto* dst =
assert_cast<vectorized::ColumnNullable*>(_result_window_columns[i].get());
dst->get_null_map_data().resize_fill(
dst->get_null_map_data().size() + static_cast<uint32_t>(end - start), 0);
_agg_functions[i]->function()->insert_result_into_range(
_fn_place_ptr + _offsets_of_aggregate_states[i], dst->get_nested_column(),
start, end);
}
} else {
_agg_functions[i]->function()->insert_result_into_range(
_fn_place_ptr + _offsets_of_aggregate_states[i], *_result_window_columns[i],
start, end);
}
}
}
void AnalyticSinkLocalState::_output_current_block(vectorized::Block* block) {
block->swap(std::move(_input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
DCHECK(_parent->cast<AnalyticSinkOperatorX>()._change_to_nullable_flags.size() ==
_result_window_columns.size());
for (size_t i = 0; i < _result_window_columns.size(); ++i) {
DCHECK(_result_window_columns[i]);
DCHECK(_agg_functions[i]);
if (_parent->cast<AnalyticSinkOperatorX>()._change_to_nullable_flags[i]) {
block->insert({make_nullable(std::move(_result_window_columns[i])),
make_nullable(_agg_functions[i]->data_type()), ""});
} else {
block->insert(
{std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""});
}
}
_output_block_index++;
}
void AnalyticSinkLocalState::_init_result_columns() {
if (_current_row_position + _have_removed_rows ==
_input_block_first_row_positions[_output_block_index]) {
_result_window_columns.resize(_agg_functions_size);
// return type create result column
for (size_t i = 0; i < _agg_functions_size; ++i) {
_result_window_columns[i] = _agg_functions[i]->data_type()->create_column();
if (_result_column_could_resize[i]) {
_result_window_columns[i]->resize(_input_blocks[_output_block_index].rows());
} else {
_result_window_columns[i]->reserve(_input_blocks[_output_block_index].rows());
}
}
}
}
void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(vectorized::Block* block) {
size_t buffer_size = 0;
{
std::unique_lock<std::mutex> lc(_shared_state->buffer_mutex);
_shared_state->blocks_buffer.push(std::move(*block));
buffer_size = _shared_state->blocks_buffer.size();
}
if (buffer_size > 128) {
// buffer have enough data, could block the sink
_dependency->block();
}
// buffer have push data, could signal the source to read
_dependency->set_ready_to_read();
}
void AnalyticSinkLocalState::_reset_state_for_next_partition() {
_partition_column_statistics.update(_partition_by_pose.end - _partition_by_pose.start);
_order_by_column_statistics.reset();
_partition_by_pose.start = _partition_by_pose.end;
_current_row_position = _partition_by_pose.start;
_reset_agg_status();
}
void AnalyticSinkLocalState::_update_order_by_range() {
// still have more data
if (_order_by_pose.is_ended && _current_row_position < _order_by_pose.end) {
return;
}
SCOPED_TIMER(_order_search_timer);
while (!_next_order_by_ends.empty()) {
int64_t peek = _next_order_by_ends.front();
_next_order_by_ends.pop();
if (peek > _order_by_pose.end) {
_order_by_pose.start = _order_by_pose.end;
_order_by_pose.end = peek;
_order_by_pose.is_ended = true;
_order_by_column_statistics.update(_order_by_pose.end - _order_by_pose.start);
return;
}
}
if (_order_by_pose.is_ended) {
_order_by_pose.start = _order_by_pose.end;
}
_order_by_pose.end = _partition_by_pose.end;
{
if (_order_by_pose.start < _order_by_pose.end) {
for (size_t i = 0; i < _order_by_exprs_size; ++i) {
_order_by_pose.end = find_first_not_equal(
_order_by_columns[i].get(), _order_by_columns[i].get(),
_order_by_pose.start, _order_by_pose.start, _order_by_pose.end);
}
}
}
if (_order_by_pose.end < _partition_by_pose.end) {
_order_by_column_statistics.update(_order_by_pose.end - _order_by_pose.start);
_order_by_pose.is_ended = true;
_find_next_order_by_ends();
return;
}
DCHECK_EQ(_partition_by_pose.end, _order_by_pose.end);
if (_partition_by_pose.is_ended) {
_order_by_pose.is_ended = true;
return;
}
_order_by_pose.is_ended = false;
}
void AnalyticSinkLocalState::_get_partition_by_end() {
//still have data, return partition_by_end directly
if (_partition_by_pose.is_ended && _current_row_position < _partition_by_pose.end) {
return;
}
//no partition_by, the all block is end
if (_partition_by_eq_expr_ctxs.empty() || (_input_total_rows == 0)) {
_partition_by_pose.end = _input_total_rows - _have_removed_rows;
_partition_by_pose.is_ended = _input_eos;
return;
}
SCOPED_TIMER(_partition_search_timer);
while (!_next_partition_ends.empty()) {
int64_t peek = _next_partition_ends.front();
_next_partition_ends.pop();
if (peek > _partition_by_pose.end) {
_partition_by_pose.end = peek;
_partition_by_pose.is_ended = true;
return;
}
}
const auto start = _partition_by_pose.end;
const auto target = (_partition_by_pose.is_ended || _partition_by_pose.end == 0)
? _partition_by_pose.end
: _partition_by_pose.end - 1;
DCHECK(_partition_exprs_size > 0);
const auto partition_column_rows = _partition_by_columns[0]->size();
_partition_by_pose.end = partition_column_rows;
{
if (start < _partition_by_pose.end) {
for (size_t i = 0; i < _partition_exprs_size; ++i) {
_partition_by_pose.end = find_first_not_equal(
_partition_by_columns[i].get(), _partition_by_columns[i].get(), target,
start, _partition_by_pose.end);
}
}
}
if (_partition_by_pose.end < partition_column_rows) {
_partition_by_pose.is_ended = true;
_find_next_partition_ends();
return;
}
DCHECK_EQ(_partition_by_pose.end, partition_column_rows);
_partition_by_pose.is_ended = _input_eos;
}
void AnalyticSinkLocalState::_find_next_partition_ends() {
if (!_partition_column_statistics.is_high_cardinality()) {
return;
}
SCOPED_TIMER(_partition_search_timer);
for (size_t i = _partition_by_pose.end + 1; i < _partition_by_columns[0]->size(); ++i) {
for (auto& column : _partition_by_columns) {
auto cmp = column->compare_at(i - 1, i, *column, 1);
if (cmp != 0) {
_next_partition_ends.push(i);
break;
}
}
}
}
void AnalyticSinkLocalState::_find_next_order_by_ends() {
if (!_order_by_column_statistics.is_high_cardinality()) {
return;
}
SCOPED_TIMER(_order_search_timer);
for (size_t i = _order_by_pose.end + 1; i < _partition_by_pose.end; ++i) {
for (auto& column : _order_by_columns) {
auto cmp = column->compare_at(i - 1, i, *column, 1);
if (cmp != 0) {
_next_order_by_ends.push(i);
break;
}
}
}
}
// Compares (*this)[n] and rhs[m]
int64_t AnalyticSinkLocalState::find_first_not_equal(vectorized::IColumn* reference_column,
vectorized::IColumn* compared_column,
int64_t target, int64_t start, int64_t end) {
while (start + 1 < end) {
int64_t mid = start + (end - start) / 2;
if (reference_column->compare_at(target, mid, *compared_column, 1) == 0) {
start = mid;
} else {
end = mid;
}
}
if (reference_column->compare_at(target, end - 1, *compared_column, 1) == 0) {
return end;
}
return end - 1;
}
AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode, dest_id),
_pool(pool),
_intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
_output_tuple_id(tnode.analytic_node.output_tuple_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.analytic_node.partition_exprs),
_window(tnode.analytic_node.window),
_has_window(tnode.analytic_node.__isset.window),
_has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE),
_has_window_start(tnode.analytic_node.window.__isset.window_start),
_has_window_end(tnode.analytic_node.window.__isset.window_end) {}
Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
const TAnalyticNode& analytic_node = tnode.analytic_node;
_agg_functions_size = analytic_node.analytic_functions.size();
_agg_expr_ctxs.resize(_agg_functions_size);
_num_agg_input.resize(_agg_functions_size);
for (int i = 0; i < _agg_functions_size; ++i) {
const TExpr& desc = analytic_node.analytic_functions[i];
vectorized::AggFnEvaluator* evaluator = nullptr;
// Window function treats all NullableAggregateFunction as AlwaysNullable.
// Its behavior is same with executed without group by key.
// https://github.com/apache/doris/pull/40693
RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(_pool, desc, {}, /*without_key*/ true,
true, &evaluator));
_agg_functions.emplace_back(evaluator);
int node_idx = 0;
_num_agg_input[i] = desc.nodes[0].num_children;
for (int j = 0; j < desc.nodes[0].num_children; ++j) {
++node_idx;
vectorized::VExprSPtr expr;
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(
vectorized::VExpr::create_tree_from_thrift(desc.nodes, &node_idx, expr, ctx));
_agg_expr_ctxs[i].emplace_back(ctx);
}
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.partition_exprs,
_partition_by_eq_expr_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.order_by_exprs,
_order_by_eq_expr_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.range_between_offset_exprs,
_range_between_expr_ctxs));
return Status::OK();
}
Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::prepare(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
}
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
_change_to_nullable_flags.resize(_agg_functions_size);
for (size_t i = 0; i < _agg_functions_size; ++i) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_agg_functions[i]->set_version(state->be_exec_version());
_change_to_nullable_flags[i] =
output_slot_desc->is_nullable() && (!_agg_functions[i]->data_type()->is_nullable());
}
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
std::vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_buffered_tuple_id);
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids);
if (!_partition_by_eq_expr_ctxs.empty()) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_partition_by_eq_expr_ctxs, state, cmp_row_desc));
}
if (!_order_by_eq_expr_ctxs.empty()) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc));
}
}
if (!_range_between_expr_ctxs.empty()) {
DCHECK(_range_between_expr_ctxs.size() == 2);
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_range_between_expr_ctxs, state, _child->row_desc()));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_range_between_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
RETURN_IF_ERROR(_agg_functions[i]->open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_agg_expr_ctxs[i], state));
}
_offsets_of_aggregate_states.resize(_agg_functions_size);
for (size_t i = 0; i < _agg_functions_size; ++i) {
_offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
const auto& agg_function = _agg_functions[i]->function();
// aggregate states are aligned based on maximum requirement
_align_aggregate_states = std::max(_align_aggregate_states, agg_function->align_of_data());
_total_size_of_aggregate_states += agg_function->size_of_data();
// If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned.
if (i + 1 < _agg_functions_size) {
size_t alignment_of_next_state = _agg_functions[i + 1]->function()->align_of_data();
if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) {
return Status::RuntimeError("Logical error: align_of_data is not 2^N");
}
/// Extend total_size to next alignment requirement
/// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state.
_total_size_of_aggregate_states =
(_total_size_of_aggregate_states + alignment_of_next_state - 1) /
alignment_of_next_state * alignment_of_next_state;
}
}
return Status::OK();
}
Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* input_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
local_state._input_eos = eos;
local_state._remove_unused_rows();
local_state._reserve_mem_size = 0;
SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
RETURN_IF_ERROR(_add_input_block(state, input_block));
RETURN_IF_ERROR(local_state._execute_impl());
if (local_state._input_eos) {
std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
local_state._dependency->set_ready_to_read(); // ready for source to read
}
return Status::OK();
}
Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
vectorized::Block* input_block) {
if (input_block->rows() <= 0) {
return Status::OK();
}
auto& local_state = get_local_state(state);
local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows);
size_t block_rows = input_block->rows();
local_state._input_total_rows += block_rows;
// record origin columns, maybe be after this, could cast some column but no need to output
auto column_to_keep = input_block->columns();
{
SCOPED_TIMER(local_state._compute_agg_data_timer);
//insert _agg_input_columns, execute calculate for its, and those columns maybe could remove have used data
for (size_t i = 0; i < _agg_functions_size; ++i) {
for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
RETURN_IF_ERROR(_insert_range_column(input_block, local_state._agg_expr_ctxs[i][j],
local_state._agg_input_columns[i][j].get(),
block_rows));
}
}
}
{
SCOPED_TIMER(local_state._compute_partition_by_timer);
for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(
_insert_range_column(input_block, local_state._partition_by_eq_expr_ctxs[i],
local_state._partition_by_columns[i].get(), block_rows));
}
}
{
SCOPED_TIMER(local_state._compute_order_by_timer);
for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(_insert_range_column(input_block, local_state._order_by_eq_expr_ctxs[i],
local_state._order_by_columns[i].get(),
block_rows));
}
}
{
SCOPED_TIMER(local_state._compute_range_between_function_timer);
for (size_t i = 0; i < local_state._range_between_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(
_insert_range_column(input_block, local_state._range_between_expr_ctxs[i],
local_state._range_result_columns[i].get(), block_rows));
}
}
vectorized::Block::erase_useless_column(input_block, column_to_keep);
COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
COUNTER_UPDATE(local_state._blocks_memory_usage, input_block->allocated_bytes());
local_state._input_blocks.emplace_back(std::move(*input_block));
return Status::OK();
}
void AnalyticSinkLocalState::_remove_unused_rows() {
// test column overflow 4G
DBUG_EXECUTE_IF("AnalyticSinkLocalState._remove_unused_rows", { return; });
#ifdef BE_TEST
const size_t block_num = 1;
#else
const size_t block_num = 256;
#endif
if (_removed_block_index + block_num + 1 >= _input_block_first_row_positions.size()) {
return;
}
const int64_t unused_rows_pos =
_input_block_first_row_positions[_removed_block_index + block_num];
if (_streaming_mode) {
auto idx = _output_block_index - 1;
if (idx < 0 || _input_block_first_row_positions[idx] <= unused_rows_pos) {
return;
}
} else {
if (_have_removed_rows + _partition_by_pose.start <= unused_rows_pos) {
return;
}
}
const int64_t remove_rows = unused_rows_pos - _have_removed_rows;
{
SCOPED_TIMER(_remove_rows_timer);
for (size_t i = 0; i < _agg_functions_size; i++) {
for (size_t j = 0; j < _agg_expr_ctxs[i].size(); j++) {
_agg_input_columns[i][j]->erase(0, remove_rows);
}
}
for (size_t i = 0; i < _partition_exprs_size; i++) {
_partition_by_columns[i]->erase(0, remove_rows);
}
for (size_t i = 0; i < _order_by_exprs_size; i++) {
_order_by_columns[i]->erase(0, remove_rows);
}
}
COUNTER_UPDATE(_remove_count, 1);
COUNTER_UPDATE(_remove_rows, remove_rows);
_current_row_position -= remove_rows;
_partition_by_pose.remove_unused_rows(remove_rows);
_order_by_pose.remove_unused_rows(remove_rows);
int64_t candidate_partition_end_size = _next_partition_ends.size();
while (--candidate_partition_end_size >= 0) {
auto peek = _next_partition_ends.front();
_next_partition_ends.pop();
_next_partition_ends.push(peek - remove_rows);
}
int64_t candidate_peer_group_end_size = _next_order_by_ends.size();
while (--candidate_peer_group_end_size >= 0) {
auto peek = _next_order_by_ends.front();
_next_order_by_ends.pop();
_next_order_by_ends.push(peek - remove_rows);
}
_removed_block_index += block_num;
_have_removed_rows += remove_rows;
DCHECK_GE(_current_row_position, 0);
DCHECK_GE(_partition_by_pose.end, 0);
}
size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
auto& local_state = get_local_state(state);
return local_state._reserve_mem_size;
}
Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length) {
vectorized::ColumnPtr column;
RETURN_IF_ERROR(expr->execute(block, column));
column = column->convert_to_full_column_if_const();
// iff dst_column is string, maybe overflow of 4G, so need ignore overflow
// the column is used by compare_at self to find the range, it's need convert it when overflow?
dst_column->insert_range_from_ignore_overflow(*column, 0, length);
return Status::OK();
}
void AnalyticSinkLocalState::_reset_agg_status() {
_use_null_result.assign(_agg_functions_size, 0);
_could_use_previous_result.assign(_agg_functions_size, 0);
for (size_t i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i]->reset(_fn_place_ptr + _offsets_of_aggregate_states[i]);
}
}
void AnalyticSinkLocalState::_create_agg_status() {
for (size_t i = 0; i < _agg_functions_size; ++i) {
try {
_agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]);
} catch (...) {
for (int j = 0; j < i; ++j) {
_agg_functions[j]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[j]);
}
throw;
}
}
_agg_functions_created = true;
}
void AnalyticSinkLocalState::_destroy_agg_status() {
if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
return;
}
for (size_t i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[i]);
}
}
template class DataSinkOperatorX<AnalyticSinkLocalState>;
} // namespace doris::pipeline