blob: b3c35e5fe54cfdc42ee8e1a37f3f1dc64915ad4f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/exec/repeat_operator.h"
#include <memory>
#include "common/logging.h"
#include "pipeline/exec/operator.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
} // namespace doris
namespace doris::pipeline {
RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent),
_child_block(vectorized::Block::create_unique()),
_repeat_id_idx(0) {}
Status RepeatLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<Parent>();
_expr_ctxs.resize(p._expr_ctxs.size());
for (size_t i = 0; i < _expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._expr_ctxs[i]->clone(state, _expr_ctxs[i]));
}
return Status::OK();
}
Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_evaluate_input_timer = ADD_TIMER(custom_profile(), "EvaluateInputDataTime");
_get_repeat_data_timer = ADD_TIMER(custom_profile(), "GetRepeatDataTime");
_filter_timer = ADD_TIMER(custom_profile(), "FilterTime");
return Status::OK();
}
Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.repeat_node.exprs, _expr_ctxs));
for (const auto& slot_idx : _grouping_list) {
if (slot_idx.size() < _repeat_id_list_size) {
return Status::InternalError(
"grouping_list size {} is less than repeat_id_list size {}", slot_idx.size(),
_repeat_id_list_size);
}
}
return Status::OK();
}
Status RepeatOperatorX::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VRepeatNode::open";
RETURN_IF_ERROR(OperatorXBase::prepare(state));
const auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
}
for (const auto& slot_desc : output_tuple_desc->slots()) {
_output_slots.push_back(slot_desc);
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::open(_expr_ctxs, state));
return Status::OK();
}
RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_slot_id_set_list(tnode.repeat_node.slot_id_set_list),
_all_slot_ids(tnode.repeat_node.all_slot_ids),
_repeat_id_list_size(tnode.repeat_node.repeat_id_list.size()),
_grouping_list(tnode.repeat_node.grouping_list),
_output_tuple_id(tnode.repeat_node.output_tuple_id) {};
// The control logic of RepeatOperator is
// push a block, output _repeat_id_list_size blocks
// In the output block, the first part of the columns comes from the input block's columns, and the latter part of the columns is the grouping_id
// If there is no expr, there is only grouping_id
// If there is an expr, the first part of the columns in the output block uses _all_slot_ids and _slot_id_set_list to control whether it is null
bool RepeatOperatorX::need_more_input_data(RuntimeState* state) const {
auto& local_state = state->get_local_state(operator_id())->cast<RepeatLocalState>();
return !local_state._child_block->rows() && !local_state._child_eos;
}
Status RepeatLocalState::get_repeated_block(vectorized::Block* input_block, int repeat_id_idx,
vectorized::Block* output_block) {
auto& p = _parent->cast<RepeatOperatorX>();
DCHECK(input_block != nullptr);
DCHECK_EQ(output_block->rows(), 0);
size_t input_column_size = input_block->columns();
size_t output_column_size = p._output_slots.size();
DCHECK_LT(input_column_size, output_column_size);
auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
p._output_slots);
auto& output_columns = m_block.mutable_columns();
/* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
* insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
* slot_id_set_list=[[0],[1]],repeat_id_idx=0,
* child_block 1,2,1 | 1,3,1 | 2,1,1 | 3,1,1
* output_block 1,null,1,1 | 1,null,1,1 | 2,nul,1,1 | 3,null,1,1
*/
size_t cur_col = 0;
for (size_t i = 0; i < input_column_size; i++) {
const vectorized::ColumnWithTypeAndName& src_column = input_block->get_by_position(i);
const auto slot_id = p._output_slots[cur_col]->id();
const bool is_repeat_slot = p._all_slot_ids.contains(slot_id);
const bool is_set_null_slot = !p._slot_id_set_list[repeat_id_idx].contains(slot_id);
const auto row_size = src_column.column->size();
if (is_repeat_slot) {
DCHECK(p._output_slots[cur_col]->is_nullable());
auto* nullable_column =
assert_cast<vectorized::ColumnNullable*>(output_columns[cur_col].get());
if (is_set_null_slot) {
// is_set_null_slot = true, output all null
nullable_column->insert_many_defaults(row_size);
} else {
if (!src_column.type->is_nullable()) {
nullable_column->insert_range_from_not_nullable(*src_column.column, 0,
row_size);
} else {
nullable_column->insert_range_from(*src_column.column, 0, row_size);
}
}
} else {
output_columns[cur_col]->insert_range_from(*src_column.column, 0, row_size);
}
cur_col++;
}
const auto rows = input_block->rows();
// Fill grouping ID to block
RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, output_columns, repeat_id_idx));
DCHECK_EQ(cur_col, output_column_size);
return Status::OK();
}
Status RepeatLocalState::add_grouping_id_column(std::size_t rows, std::size_t& cur_col,
vectorized::MutableColumns& columns,
int repeat_id_idx) {
auto& p = _parent->cast<RepeatOperatorX>();
for (auto slot_idx = 0; slot_idx < p._grouping_list.size(); slot_idx++) {
DCHECK_LT(slot_idx, p._output_slots.size());
int64_t val = p._grouping_list[slot_idx][repeat_id_idx];
auto* column_ptr = columns[cur_col].get();
DCHECK(!p._output_slots[cur_col]->is_nullable());
auto* col = assert_cast<vectorized::ColumnInt64*>(column_ptr);
col->insert_many_vals(val, rows);
cur_col++;
}
return Status::OK();
}
Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block, bool eos) const {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state._evaluate_input_timer);
local_state._child_eos = eos;
auto& intermediate_block = local_state._intermediate_block;
auto& expr_ctxs = local_state._expr_ctxs;
DCHECK(!intermediate_block || intermediate_block->rows() == 0);
if (input_block->rows() > 0) {
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
intermediate_block = vectorized::Block::create_unique();
for (auto& expr : expr_ctxs) {
int result_column_id = -1;
RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
DCHECK(result_column_id != -1);
input_block->get_by_position(result_column_id).column =
input_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
intermediate_block->insert(input_block->get_by_position(result_column_id));
}
DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns());
}
return Status::OK();
}
Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) const {
auto& local_state = get_local_state(state);
auto& _repeat_id_idx = local_state._repeat_id_idx;
auto& _child_block = *local_state._child_block;
auto& _child_eos = local_state._child_eos;
auto& _intermediate_block = local_state._intermediate_block;
RETURN_IF_CANCELLED(state);
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
DCHECK(_repeat_id_idx <= (int)v.size());
}
DCHECK(output_block->rows() == 0);
{
SCOPED_TIMER(local_state._get_repeat_data_timer);
// Each pull increases _repeat_id_idx by one until _repeat_id_idx equals _repeat_id_list_size
// Then clear the data of _intermediate_block and _child_block, and set _repeat_id_idx to 0
// need_more_input_data will check if _child_block is empty
if (_intermediate_block && _intermediate_block->rows() > 0) {
RETURN_IF_ERROR(local_state.get_repeated_block(_intermediate_block.get(),
_repeat_id_idx, output_block));
_repeat_id_idx++;
if (_repeat_id_idx >= _repeat_id_list_size) {
_intermediate_block->clear();
_child_block.clear_column_data(_child->row_desc().num_materialized_slots());
_repeat_id_idx = 0;
}
} else if (local_state._expr_ctxs.empty()) {
auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
output_block, _output_slots);
auto rows = _child_block.rows();
auto& columns = m_block.mutable_columns();
std::size_t cur_col = 0;
RETURN_IF_ERROR(
local_state.add_grouping_id_column(rows, cur_col, columns, _repeat_id_idx));
_repeat_id_idx++;
if (_repeat_id_idx >= _repeat_id_list_size) {
_intermediate_block->clear();
_child_block.clear_column_data(_child->row_desc().num_materialized_slots());
_repeat_id_idx = 0;
}
}
}
{
SCOPED_TIMER(local_state._filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
output_block->columns()));
}
*eos = _child_eos && _child_block.rows() == 0;
local_state.reached_limit(output_block, eos);
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris::pipeline