| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/exec/vunion_node.h" |
| |
| #include <gen_cpp/Exprs_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| |
| #include <boost/iterator/iterator_facade.hpp> |
| #include <functional> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| |
| #include "common/status.h" |
| #include "runtime/runtime_state.h" |
| #include "util/runtime_profile.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/columns/columns_number.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/columns_with_type_and_name.h" |
| #include "vec/data_types/data_type_number.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/utils/util.hpp" |
| |
| namespace doris { |
| class DescriptorTbl; |
| class ObjectPool; |
| |
| namespace vectorized { |
| |
| VUnionNode::VUnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) |
| : ExecNode(pool, tnode, descs), |
| _first_materialized_child_idx(tnode.union_node.first_materialized_child_idx), |
| _const_expr_list_idx(0), |
| _child_idx(0), |
| _child_row_idx(0), |
| _child_eos(false), |
| _to_close_child_idx(-1) {} |
| |
| Status VUnionNode::init(const TPlanNode& tnode, RuntimeState* state) { |
| RETURN_IF_ERROR(ExecNode::init(tnode, state)); |
| DCHECK(tnode.__isset.union_node); |
| // Create const_expr_ctx_lists_ from thrift exprs. |
| const auto& const_texpr_lists = tnode.union_node.const_expr_lists; |
| for (const auto& texprs : const_texpr_lists) { |
| VExprContextSPtrs ctxs; |
| RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs)); |
| _const_expr_lists.push_back(ctxs); |
| } |
| // Create result_expr_ctx_lists_ from thrift exprs. |
| const auto& result_texpr_lists = tnode.union_node.result_expr_lists; |
| for (const auto& texprs : result_texpr_lists) { |
| VExprContextSPtrs ctxs; |
| RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs)); |
| _child_expr_lists.push_back(ctxs); |
| } |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::prepare(RuntimeState* state) { |
| SCOPED_TIMER(_runtime_profile->total_time_counter()); |
| RETURN_IF_ERROR(ExecNode::prepare(state)); |
| SCOPED_TIMER(_exec_timer); |
| _materialize_exprs_evaluate_timer = |
| ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer"); |
| |
| // Prepare const expr lists. |
| for (const VExprContextSPtrs& exprs : _const_expr_lists) { |
| RETURN_IF_ERROR(VExpr::prepare(exprs, state, _row_descriptor)); |
| } |
| |
| // Prepare result expr lists. |
| for (int i = 0; i < _child_expr_lists.size(); ++i) { |
| RETURN_IF_ERROR(VExpr::prepare(_child_expr_lists[i], state, child(i)->row_desc())); |
| } |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::open(RuntimeState* state) { |
| RETURN_IF_ERROR(ExecNode::open(state)); // exactly same with this->alloc_resource() |
| // Ensures that rows are available for clients to fetch after this open() has |
| // succeeded. |
| if (!_children.empty()) { |
| RETURN_IF_ERROR(child(_child_idx)->open(state)); |
| } |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::alloc_resource(RuntimeState* state) { |
| SCOPED_TIMER(_exec_timer); |
| SCOPED_TIMER(_runtime_profile->total_time_counter()); |
| |
| std::unique_lock<std::mutex> l(_resource_lock); |
| if (_resource_allocated) { |
| return Status::OK(); |
| } |
| |
| // open const expr lists. |
| for (const auto& exprs : _const_expr_lists) { |
| RETURN_IF_ERROR(VExpr::open(exprs, state)); |
| } |
| // open result expr lists. |
| for (const auto& exprs : _child_expr_lists) { |
| RETURN_IF_ERROR(VExpr::open(exprs, state)); |
| } |
| RETURN_IF_ERROR(ExecNode::alloc_resource(state)); |
| _resource_allocated = true; |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) { |
| DCHECK(!reached_limit()); |
| DCHECK_LT(_child_idx, _children.size()); |
| DCHECK(is_child_passthrough(_child_idx)); |
| if (_child_eos) { |
| RETURN_IF_ERROR(child(_child_idx)->open(state)); |
| _child_eos = false; |
| } |
| DCHECK_EQ(block->rows(), 0); |
| RETURN_IF_ERROR(child(_child_idx) |
| ->get_next_after_projects( |
| state, block, &_child_eos, |
| std::bind((Status(ExecNode::*)(RuntimeState*, |
| vectorized::Block*, bool*)) & |
| ExecNode::get_next, |
| _children[_child_idx], std::placeholders::_1, |
| std::placeholders::_2, std::placeholders::_3))); |
| if (_child_eos) { |
| // Even though the child is at eos, it's not OK to close() it here. Once we close |
| // the child, the row batches that it produced are invalid. Marking the batch as |
| // needing a deep copy let's us safely close the child in the next get_next() call. |
| // TODO: Remove this as part of IMPALA-4179. |
| _to_close_child_idx = _child_idx; |
| ++_child_idx; |
| } |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { |
| // Fetch from children, evaluate corresponding exprs and materialize. |
| DCHECK(!reached_limit()); |
| DCHECK_LT(_child_idx, _children.size()); |
| |
| MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); |
| |
| Block child_block; |
| while (has_more_materialized() && mblock.rows() <= state->batch_size()) { |
| // The loop runs until we are either done iterating over the children that require |
| // materialization, or the row batch is at capacity. |
| DCHECK(!is_child_passthrough(_child_idx)); |
| // Child row batch was either never set or we're moving on to a different child. |
| DCHECK_LT(_child_idx, _children.size()); |
| // open the current child unless it's the first child, which was already opened in |
| // VUnionNode::open(). |
| if (_child_eos) { |
| RETURN_IF_ERROR(child(_child_idx)->open(state)); |
| _child_eos = false; |
| _child_row_idx = 0; |
| } |
| // Here need materialize block of child block, so here so not mem_reuse |
| child_block.clear(); |
| // The first batch from each child is always fetched here. |
| RETURN_IF_ERROR(child(_child_idx) |
| ->get_next_after_projects( |
| state, &child_block, &_child_eos, |
| std::bind((Status(ExecNode::*)(RuntimeState*, |
| vectorized::Block*, bool*)) & |
| ExecNode::get_next, |
| _children[_child_idx], std::placeholders::_1, |
| std::placeholders::_2, std::placeholders::_3))); |
| SCOPED_TIMER(_materialize_exprs_evaluate_timer); |
| if (child_block.rows() > 0) { |
| Block res; |
| RETURN_IF_ERROR(materialize_block(&child_block, _child_idx, &res)); |
| RETURN_IF_ERROR(mblock.merge(res)); |
| } |
| // It shouldn't be the case that we reached the limit because we shouldn't have |
| // incremented '_num_rows_returned' yet. |
| DCHECK(!reached_limit()); |
| if (_child_eos) { |
| ++_child_idx; |
| } |
| } |
| block->set_columns(std::move(mblock.mutable_columns())); |
| |
| DCHECK_LE(_child_idx, _children.size()); |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { |
| SCOPED_TIMER(_exec_timer); |
| DCHECK_EQ(state->per_fragment_instance_idx(), 0); |
| DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size()); |
| |
| MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); |
| for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size(); |
| ++_const_expr_list_idx) { |
| Block tmp_block; |
| tmp_block.insert({vectorized::ColumnUInt8::create(1), |
| std::make_shared<vectorized::DataTypeUInt8>(), ""}); |
| int const_expr_lists_size = _const_expr_lists[_const_expr_list_idx].size(); |
| if (_const_expr_list_idx && const_expr_lists_size != _const_expr_lists[0].size()) { |
| return Status::InternalError( |
| "[UnionNode]const expr at {}'s count({}) not matched({} expected)", |
| _const_expr_list_idx, const_expr_lists_size, _const_expr_lists[0].size()); |
| } |
| std::vector<int> result_list(const_expr_lists_size); |
| for (size_t i = 0; i < const_expr_lists_size; ++i) { |
| RETURN_IF_ERROR(_const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block, |
| &result_list[i])); |
| } |
| tmp_block.erase_not_in(result_list); |
| if (tmp_block.columns() != mblock.columns()) { |
| return Status::InternalError( |
| "[UnionNode]columns count of const expr block not matched ({} vs {})", |
| tmp_block.columns(), mblock.columns()); |
| } |
| if (tmp_block.rows() > 0) { |
| RETURN_IF_ERROR(mblock.merge(tmp_block)); |
| tmp_block.clear(); |
| } |
| } |
| block->set_columns(std::move(mblock.mutable_columns())); |
| |
| // some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);" |
| // the const expr will be in output expr cause the union node return a empty block. so here we |
| // need add one row to make sure the union node exec const expr return at least one row |
| if (block->rows() == 0) { |
| block->insert({vectorized::ColumnUInt8::create(1), |
| std::make_shared<vectorized::DataTypeUInt8>(), ""}); |
| } |
| return Status::OK(); |
| } |
| |
| //for pipeline operator |
| Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id, |
| vectorized::Block* input_block, |
| vectorized::Block* output_block) { |
| DCHECK_LT(child_id, _children.size()); |
| DCHECK(!is_child_passthrough(child_id)); |
| if (input_block->rows() > 0) { |
| MutableBlock mblock = |
| VectorizedUtils::build_mutable_mem_reuse_block(output_block, _row_descriptor); |
| Block res; |
| RETURN_IF_ERROR(materialize_block(input_block, child_id, &res)); |
| RETURN_IF_ERROR(mblock.merge(res)); |
| |
| output_block->set_columns(std::move(mblock.mutable_columns())); |
| } |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) { |
| SCOPED_TIMER(_runtime_profile->total_time_counter()); |
| RETURN_IF_CANCELLED(state); |
| |
| // TODO: Rethink the logic, which cause close the exec node twice. |
| if (_to_close_child_idx != -1) { |
| // The previous child needs to be closed if passthrough was enabled for it. In the non |
| // passthrough case, the child was already closed in the previous call to get_next(). |
| DCHECK(is_child_passthrough(_to_close_child_idx)); |
| static_cast<void>(child(_to_close_child_idx)->close(state)); |
| _to_close_child_idx = -1; |
| } |
| |
| // Save the number of rows in case get_next() is called with a non-empty batch, which can |
| // happen in a subplan. |
| if (has_more_passthrough()) { |
| RETURN_IF_ERROR(get_next_pass_through(state, block)); |
| } else if (has_more_materialized()) { |
| RETURN_IF_ERROR(get_next_materialized(state, block)); |
| } else if (has_more_const(state)) { |
| RETURN_IF_ERROR(get_next_const(state, block)); |
| } |
| SCOPED_TIMER(_exec_timer); |
| RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, block, block->columns())); |
| |
| *eos = (!has_more_passthrough() && !has_more_materialized() && !has_more_const(state)); |
| reached_limit(block, eos); |
| |
| return Status::OK(); |
| } |
| |
| Status VUnionNode::close(RuntimeState* state) { |
| if (is_closed()) { |
| return Status::OK(); |
| } |
| release_resource(state); |
| return ExecNode::close(state); |
| } |
| |
| void VUnionNode::release_resource(RuntimeState* state) { |
| if (is_closed()) { |
| return; |
| } |
| return ExecNode::release_resource(state); |
| } |
| |
| void VUnionNode::debug_string(int indentation_level, std::stringstream* out) const { |
| *out << string(indentation_level * 2, ' '); |
| *out << "_union(_first_materialized_child_idx=" << _first_materialized_child_idx |
| << " _child_expr_lists=["; |
| for (const auto& _child_expr_list : _child_expr_lists) { |
| *out << VExpr::debug_string(_child_expr_list) << ", "; |
| } |
| *out << "] \n"; |
| ExecNode::debug_string(indentation_level, out); |
| *out << ")" << std::endl; |
| } |
| |
| Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block* res_block) { |
| SCOPED_TIMER(_exec_timer); |
| const auto& child_exprs = _child_expr_lists[child_idx]; |
| ColumnsWithTypeAndName colunms; |
| for (const auto& child_expr : child_exprs) { |
| int result_column_id = -1; |
| RETURN_IF_ERROR(child_expr->execute(src_block, &result_column_id)); |
| colunms.emplace_back(src_block->get_by_position(result_column_id)); |
| } |
| _child_row_idx += src_block->rows(); |
| *res_block = {colunms}; |
| return Status::OK(); |
| } |
| |
| } // namespace vectorized |
| } // namespace doris |