blob: 38293f3cae0e87e275195059cdab9c9e0eaf63c9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/exec/union_source_operator.h"
#include <functional>
#include <utility>
#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "runtime/descriptors.h"
#include "util/defer_op.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/data_types/data_type_number.h" // IWYU pragma: keep
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
namespace pipeline {
Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
if (p.get_child_count() != 0) {
((UnionSharedState*)_dependency->shared_state())
->data_queue.set_source_dependency(_shared_state->source_deps.front());
} else {
_only_const_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY");
_dependency = _only_const_dependency.get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
common_profile(), "WaitForDependency[" + _dependency->name() + "]Time", 1);
_dependency->set_ready();
}
return Status::OK();
}
bool UnionSourceLocalState::must_set_shared_state() const {
auto& p = _parent->cast<Parent>();
// if this operator has no children, there is no shared state.(because no sink )
return p.get_child_count() != 0;
}
Status UnionSourceLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<Parent>();
// Const exprs materialized by this node. These exprs don't refer to any children.
// Only materialized by the first fragment instance to avoid duplication.
if (state->per_fragment_instance_idx() == 0) {
auto clone_expr_list = [&](vectorized::VExprContextSPtrs& cur_expr_list,
vectorized::VExprContextSPtrs& other_expr_list) {
cur_expr_list.resize(other_expr_list.size());
for (int i = 0; i < cur_expr_list.size(); i++) {
RETURN_IF_ERROR(other_expr_list[i]->clone(state, cur_expr_list[i]));
}
return Status::OK();
};
_const_expr_lists.resize(p._const_expr_lists.size());
for (int i = 0; i < _const_expr_lists.size(); i++) {
auto& _const_expr_list = _const_expr_lists[i];
auto& other_expr_list = p._const_expr_lists[i];
RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list));
}
}
return Status::OK();
}
std::string UnionSourceLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.has_more_data());
}
return fmt::to_string(debug_string_buffer);
}
Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
Defer set_eos {[&]() {
// the eos check of union operator is complex, need check all logical if you want modify
// could ref this PR: https://github.com/apache/doris/pull/29677
// have executing const expr, queue have no data anymore, and child could be closed
if (_child_size == 0 && !local_state._need_read_for_const_expr) {
*eos = true;
} else if (_has_data(state)) {
*eos = false;
} else if (local_state._shared_state->data_queue.is_all_finish()) {
// Here, check the value of `_has_data(state)` again after `data_queue.is_all_finish()` is TRUE
// as there may be one or more blocks when `data_queue.is_all_finish()` is TRUE.
*eos = !_has_data(state);
} else {
*eos = false;
}
}};
SCOPED_TIMER(local_state.exec_time_counter());
if (local_state._need_read_for_const_expr) {
if (has_more_const(state)) {
RETURN_IF_ERROR(get_next_const(state, block));
}
local_state._need_read_for_const_expr = has_more_const(state);
} else if (_child_size != 0) {
std::unique_ptr<vectorized::Block> output_block;
int child_idx = 0;
RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block,
&child_idx));
if (!output_block) {
return Status::OK();
}
block->swap(*output_block);
output_block->clear_column_data(row_descriptor().num_materialized_slots());
local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx);
}
local_state.reached_limit(block, eos);
return Status::OK();
}
Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Block* block) {
DCHECK_EQ(state->per_fragment_instance_idx(), 0);
auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
auto& _const_expr_list_idx = local_state._const_expr_list_idx;
vectorized::MutableBlock mblock =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, row_descriptor());
vectorized::ColumnsWithTypeAndName tmp_block_columns;
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < state->batch_size();
++_const_expr_list_idx) {
int const_expr_lists_size = cast_set<int>(_const_expr_lists[_const_expr_list_idx].size());
if (_const_expr_list_idx && const_expr_lists_size != _const_expr_lists[0].size()) {
return Status::InternalError(
"[UnionNode]const expr at {}'s count({}) not matched({} expected)",
_const_expr_list_idx, const_expr_lists_size, _const_expr_lists[0].size());
}
std::vector<int> result_list(const_expr_lists_size);
tmp_block_columns.resize(const_expr_lists_size);
for (size_t i = 0; i < const_expr_lists_size; ++i) {
RETURN_IF_ERROR(_const_expr_lists[_const_expr_list_idx][i]->execute_const_expr(
tmp_block_columns[i]));
}
vectorized::Block tmp_block(tmp_block_columns);
if (tmp_block.columns() != mblock.columns()) {
return Status::InternalError(
"[UnionNode]columns count of const expr block not matched ({} vs {})",
tmp_block.columns(), mblock.columns());
}
if (tmp_block.rows() > 0) {
RETURN_IF_ERROR(mblock.merge(tmp_block));
tmp_block.clear();
}
}
// some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);"
// the const expr will be in output expr cause the union node return a empty block. so here we
// need add one row to make sure the union node exec const expr return at least one row
if (block->rows() == 0) {
block->insert({vectorized::ColumnUInt8::create(1),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
}
return Status::OK();
}
Status UnionSourceLocalState::close(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
if (_shared_state) {
_shared_state->data_queue.terminate();
}
return Base::close(state);
}
} // namespace pipeline
#include "common/compile_check_end.h"
} // namespace doris