blob: 778409087ee18d6eb520863d1684bd209de20970 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hashjoin_build_sink.h"
#include <cstdlib>
#include <string>
#include <variant>
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/utils/template_helpers.hpp"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: JoinBuildSinkLocalState(parent, state) {
_finish_dependency = std::make_shared<CountedFinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY");
}
Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_task_idx = info.task_idx;
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_build_expr_ctxs.resize(p._build_expr_ctxs.size());
for (size_t i = 0; i < _build_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._build_expr_ctxs[i]->clone(state, _build_expr_ctxs[i]));
}
_shared_state->build_exprs_size = _build_expr_ctxs.size();
_should_build_hash_table = true;
custom_profile()->add_info_string("BroadcastJoin", std::to_string(p._is_broadcast_join));
if (p._use_shared_hash_table) {
_should_build_hash_table = info.task_idx == 0;
}
custom_profile()->add_info_string("BuildShareHashTable",
std::to_string(_should_build_hash_table));
custom_profile()->add_info_string("ShareHashTableEnabled",
std::to_string(p._use_shared_hash_table));
if (!_should_build_hash_table) {
_dependency->block();
_finish_dependency->block();
{
std::lock_guard<std::mutex> guard(p._mutex);
p._finish_dependencies.push_back(_finish_dependency);
}
} else {
_dependency->set_ready();
}
_build_blocks_memory_usage =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_build_arena_memory_usage =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "MemoryUsageBuildKeyArena", TUnit::BYTES, 1);
// Build phase
auto* record_profile = _should_build_hash_table ? custom_profile() : faker_runtime_profile();
_build_table_timer = ADD_TIMER(custom_profile(), "BuildHashTableTime");
_build_side_merge_block_timer = ADD_TIMER(custom_profile(), "MergeBuildBlockTime");
_build_table_insert_timer = ADD_TIMER(record_profile, "BuildTableInsertTime");
_build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime");
_runtime_filter_producer_helper = std::make_shared<RuntimeFilterProducerHelper>(
_should_build_hash_table, p._is_broadcast_join);
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _build_expr_ctxs,
p._runtime_filter_descs));
return Status::OK();
}
Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
return Status::OK();
}
Status HashJoinBuildSinkLocalState::terminate(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
if (_terminated) {
return Status::OK();
}
RETURN_IF_ERROR(_runtime_filter_producer_helper->skip_process(state));
return JoinBuildSinkLocalState::terminate(state);
}
size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) {
if (!_should_build_hash_table) {
return 0;
}
if (_shared_state->build_block) {
return 0;
}
size_t size_to_reserve = 0;
const size_t build_block_rows = _build_side_mutable_block.rows();
if (build_block_rows != 0) {
const auto bytes = _build_side_mutable_block.bytes();
const auto allocated_bytes = _build_side_mutable_block.allocated_bytes();
const auto bytes_per_row = bytes / build_block_rows;
const auto estimated_size_of_next_block = bytes_per_row * state->batch_size();
// If the new size is greater than 85% of allocalted bytes, it maybe need to realloc.
if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes) >= 85) {
size_to_reserve += static_cast<size_t>(static_cast<double>(allocated_bytes) * 1.15);
}
}
if (eos) {
const size_t rows = build_block_rows + state->batch_size();
const auto bucket_size = hash_join_table_calc_bucket_size(rows);
size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first
size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_OUTER_JOIN ||
p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == TJoinOp::RIGHT_SEMI_JOIN) {
size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
}
size_to_reserve += _evaluate_mem_usage;
vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
if (build_block_rows > 0) {
auto block = _build_side_mutable_block.to_block();
std::vector<uint16_t> converted_columns;
Defer defer([&]() {
for (auto i : converted_columns) {
auto& data = block.get_by_position(i);
data.column = vectorized::remove_nullable(data.column);
data.type = vectorized::remove_nullable(data.type);
}
_build_side_mutable_block = vectorized::MutableBlock(std::move(block));
});
vectorized::ColumnUInt8::MutablePtr null_map_val;
if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN) {
converted_columns = _convert_block_to_null(block);
// first row is mocked
for (int i = 0; i < block.columns(); i++) {
auto [column, is_const] = unpack_if_const(block.safe_get_by_position(i).column);
assert_cast<vectorized::ColumnNullable*>(column->assume_mutable().get())
->get_null_map_column()
.get_data()
.data()[0] = 1;
}
}
null_map_val = vectorized::ColumnUInt8::create();
null_map_val->get_data().assign(build_block_rows, (uint8_t)0);
// Get the key column that needs to be built
Status st = _extract_join_column(block, null_map_val, raw_ptrs, _build_col_ids);
if (!st.ok()) {
throw Exception(st);
}
std::visit(vectorized::Overload {[&](std::monostate& arg) {},
[&](auto&& hash_map_context) {
size_to_reserve += hash_map_context.estimated_size(
raw_ptrs, (uint32_t)block.rows(), true,
true, bucket_size);
}},
_shared_state->hash_table_variant_vector.front()->method_variant);
}
}
return size_to_reserve;
}
Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (!_should_build_hash_table) {
return;
}
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
// when key column from build side tuple, we should keep it
// if key column belong to intermediate tuple, it means _build_col_ids[0] >= _should_keep_column_flags.size(),
// key column still kept too.
if (_build_col_ids[0] < p._should_keep_column_flags.size()) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}
}
if (_shared_state->build_block) {
// release the memory of unused column in probe stage
_shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags,
p._use_shared_hash_table);
}
if (p._use_shared_hash_table) {
std::unique_lock lock(p._mutex);
p._signaled = true;
for (auto& dep : _shared_state->sink_deps) {
dep->set_ready();
}
for (auto& dep : p._finish_dependencies) {
dep->set_ready();
}
}
}};
try {
if (!_terminated && _runtime_filter_producer_helper && !state->is_cancelled()) {
RETURN_IF_ERROR(_runtime_filter_producer_helper->build(
state, _shared_state->build_block.get(), p._use_shared_hash_table,
p._runtime_filters));
RETURN_IF_ERROR(_runtime_filter_producer_helper->publish(state));
}
} catch (Exception& e) {
bool blocked_by_shared_hash_table_signal =
!_should_build_hash_table && p._use_shared_hash_table && !p._signaled;
return Status::InternalError(
"rf process meet error: {}, _terminated: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, "
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), _terminated, _should_build_hash_table,
_finish_dependency ? _finish_dependency->debug_string() : "null",
blocked_by_shared_hash_table_signal);
}
if (_runtime_filter_producer_helper) {
_runtime_filter_producer_helper->collect_realtime_profile(custom_profile());
}
return Base::close(state, exec_status);
}
bool HashJoinBuildSinkLocalState::build_unique() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
}
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
bool empty_block =
!_shared_state->build_block ||
!(_shared_state->build_block->rows() > 1); // build size always mock a row into block
_shared_state->short_circuit_for_probe =
((_shared_state->_has_null_in_build_side &&
p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
(empty_block &&
(p._join_op == TJoinOp::INNER_JOIN || p._join_op == TJoinOp::LEFT_SEMI_JOIN ||
p._join_op == TJoinOp::RIGHT_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_SEMI_JOIN ||
p._join_op == TJoinOp::RIGHT_ANTI_JOIN))) &&
!p._is_mark_join;
//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
(empty_block && !p._have_other_join_conjunct && !p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
Status HashJoinBuildSinkLocalState::_do_evaluate(vectorized::Block& block,
vectorized::VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer,
std::vector<int>& res_col_ids) {
auto origin_size = block.allocated_bytes();
for (size_t i = 0; i < exprs.size(); ++i) {
int result_col_id = -1;
// execute build column
{
SCOPED_TIMER(&expr_call_timer);
RETURN_IF_ERROR(exprs[i]->execute(&block, &result_col_id));
}
// TODO: opt the column is const
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
res_col_ids[i] = result_col_id;
}
_evaluate_mem_usage = block.allocated_bytes() - origin_size;
return Status::OK();
}
std::vector<uint16_t> HashJoinBuildSinkLocalState::_convert_block_to_null(
vectorized::Block& block) {
std::vector<uint16_t> results;
for (int i = 0; i < block.columns(); ++i) {
if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) {
DCHECK(!column_type.column->is_nullable());
column_type.column = make_nullable(column_type.column);
column_type.type = make_nullable(column_type.type);
results.emplace_back(i);
}
}
return results;
}
Status HashJoinBuildSinkLocalState::_extract_join_column(
vectorized::Block& block, vectorized::ColumnUInt8::MutablePtr& null_map,
vectorized::ColumnRawPtrs& raw_ptrs, const std::vector<int>& res_col_ids) {
DCHECK(_should_build_hash_table);
auto& shared_state = *_shared_state;
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
const auto* column = block.get_by_position(res_col_ids[i]).column.get();
if (!column->is_nullable() &&
_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i]) {
_key_columns_holder.emplace_back(
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
raw_ptrs[i] = _key_columns_holder.back().get();
} else if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column);
!_parent->cast<HashJoinBuildSinkOperatorX>()._serialize_null_into_key[i] &&
nullable) {
// update nulllmap and split nested out of ColumnNullable when serialize_null_into_key is false and column is nullable
const auto& col_nested = nullable->get_nested_column();
const auto& col_nullmap = nullable->get_null_map_data();
DCHECK(null_map);
vectorized::VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
raw_ptrs[i] = &col_nested;
} else {
raw_ptrs[i] = column;
}
}
return Status::OK();
}
Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
vectorized::Block& block) {
DCHECK(_should_build_hash_table);
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
SCOPED_TIMER(_build_table_timer);
auto rows = (uint32_t)block.rows();
// 1. Dispose the overflow of ColumnString
// 2. Finalize the ColumnVariant to speed up
for (auto& data : block) {
data.column = std::move(*data.column).mutate()->convert_column_if_overflow();
if (p._need_finalize_variant_column) {
std::move(*data.column).mutate()->finalize();
}
}
vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
vectorized::ColumnUInt8::MutablePtr null_map_val;
if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
// first row is mocked
for (int i = 0; i < block.columns(); i++) {
auto [column, is_const] = unpack_if_const(block.safe_get_by_position(i).column);
assert_cast<vectorized::ColumnNullable*>(column->assume_mutable().get())
->get_null_map_column()
.get_data()
.data()[0] = 1;
}
}
_set_build_side_has_external_nullmap(block, _build_col_ids);
if (_build_side_has_external_nullmap) {
null_map_val = vectorized::ColumnUInt8::create();
null_map_val->get_data().assign((size_t)rows, (uint8_t)0);
}
// Get the key column that needs to be built
RETURN_IF_ERROR(_extract_join_column(block, null_map_val, raw_ptrs, _build_col_ids));
RETURN_IF_ERROR(_hash_table_init(state, raw_ptrs));
Status st = std::visit(
vectorized::Overload {
[&](std::monostate& arg, auto join_op,
auto short_circuit_for_null_in_build_side,
auto with_other_conjuncts) -> Status {
throw Exception(Status::FatalError("FATAL: uninited hash table"));
},
[&](auto&& arg, auto&& join_op, auto short_circuit_for_null_in_build_side,
auto with_other_conjuncts) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
using JoinOpType = std::decay_t<decltype(join_op)>;
ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
rows, raw_ptrs, this, state->batch_size(), state);
auto st = hash_table_build_process.template run<
JoinOpType::value, short_circuit_for_null_in_build_side,
with_other_conjuncts>(
arg, null_map_val ? &null_map_val->get_data() : nullptr,
&_shared_state->_has_null_in_build_side);
COUNTER_SET(_memory_used_counter,
_build_blocks_memory_usage->value() +
(int64_t)(arg.hash_table->get_byte_size() +
arg.serialized_keys_size(true)));
return st;
}},
_shared_state->hash_table_variant_vector.front()->method_variant,
_shared_state->join_op_variants,
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side),
vectorized::make_bool_variant((p._have_other_join_conjunct)));
return st;
}
void HashJoinBuildSinkLocalState::_set_build_side_has_external_nullmap(
vectorized::Block& block, const std::vector<int>& res_col_ids) {
DCHECK(_should_build_hash_table);
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
if (p._short_circuit_for_null_in_build_side) {
_build_side_has_external_nullmap = true;
return;
}
for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
const auto* column = block.get_by_position(res_col_ids[i]).column.get();
if (column->is_nullable() && !p._serialize_null_into_key[i]) {
_build_side_has_external_nullmap = true;
return;
}
}
}
Status HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state,
const vectorized::ColumnRawPtrs& raw_ptrs) {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
std::vector<vectorized::DataTypePtr> data_types;
for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
auto& ctx = _build_expr_ctxs[i];
auto data_type = ctx->root()->data_type();
/// For 'null safe equal' join,
/// the build key column maybe be converted to nullable from non-nullable.
if (p._serialize_null_into_key[i]) {
data_types.emplace_back(vectorized::make_nullable(data_type));
} else {
// in this case, we use nullmap to represent null value
data_types.emplace_back(vectorized::remove_nullable(data_type));
}
}
if (_build_expr_ctxs.size() == 1) {
p._should_keep_hash_key_column = true;
}
std::vector<std::shared_ptr<JoinDataVariants>> variant_ptrs;
if (p._is_broadcast_join && p._use_shared_hash_table) {
variant_ptrs = _shared_state->hash_table_variant_vector;
} else {
variant_ptrs.emplace_back(
_shared_state->hash_table_variant_vector[p._use_shared_hash_table ? _task_idx : 0]);
}
for (auto& variant_ptr : variant_ptrs) {
RETURN_IF_ERROR(init_hash_method<JoinDataVariants>(variant_ptr.get(), data_types, true));
}
std::visit([&](auto&& arg) { try_convert_to_direct_mapping(&arg, raw_ptrs, variant_ptrs); },
variant_ptrs[0]->method_variant);
return Status::OK();
}
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id,
int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
: JoinBuildSinkOperatorX(pool, operator_id, dest_id, tnode, descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join),
_partition_exprs(tnode.__isset.distribute_expr_lists && !_is_broadcast_join
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}) {}
Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);
if (tnode.hash_join_node.__isset.hash_output_slot_ids) {
_hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids;
}
const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts;
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr build_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right, build_ctx));
{
// for type check
vectorized::VExprContextSPtr probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, probe_ctx));
auto build_side_expr_type = build_ctx->root()->data_type();
auto probe_side_expr_type = probe_ctx->root()->data_type();
if (!vectorized::make_nullable(build_side_expr_type)
->equals(*vectorized::make_nullable(probe_side_expr_type))) {
return Status::InternalError(
"build side type {}, not match probe side type {} , node info "
"{}",
build_side_expr_type->get_name(), probe_side_expr_type->get_name(),
this->debug_string(0));
}
}
_build_expr_ctxs.push_back(build_ctx);
const auto vexpr = _build_expr_ctxs.back()->root();
/// null safe equal means null = null is true, the operator in SQL should be: <=>.
const bool is_null_safe_equal =
eq_join_conjunct.__isset.opcode &&
(eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL) &&
// For a null safe equal join, FE may generate a plan that
// both sides of the conjuct are not nullable, we just treat it
// as a normal equal join conjunct.
(eq_join_conjunct.right.nodes[0].is_nullable ||
eq_join_conjunct.left.nodes[0].is_nullable);
_is_null_safe_eq_join.push_back(is_null_safe_equal);
if (eq_join_conjuncts.size() == 1) {
// single column key serialize method must use nullmap for represent null to instead serialize null into key
_serialize_null_into_key.emplace_back(false);
} else if (is_null_safe_equal) {
// use serialize null into key to represent multi column null value
_serialize_null_into_key.emplace_back(true);
} else {
// on normal conditions, because null!=null, it can be expressed directly with nullmap.
_serialize_null_into_key.emplace_back(false);
}
}
return Status::OK();
}
Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state));
_use_shared_hash_table =
_is_broadcast_join && state->enable_share_hash_table_for_broadcast_join();
auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
output_slot_flags.emplace_back(
std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(),
slot_desc->id()) != _hash_output_slot_ids.end());
if (output_slot_flags.back() &&
slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
_need_finalize_variant_column = true;
}
}
}
};
init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags);
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}
Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
if (local_state._build_side_mutable_block.empty()) {
auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename(
_child->row_desc());
tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false));
local_state._build_col_ids.resize(_build_expr_ctxs.size());
RETURN_IF_ERROR(local_state._do_evaluate(tmp_build_block, local_state._build_expr_ctxs,
*local_state._build_expr_call_timer,
local_state._build_col_ids));
local_state._build_side_mutable_block =
vectorized::MutableBlock::build_mutable_block(&tmp_build_block);
}
if (!in_block->empty()) {
std::vector<int> res_col_ids(_build_expr_ctxs.size());
RETURN_IF_ERROR(local_state._do_evaluate(*in_block, local_state._build_expr_ctxs,
*local_state._build_expr_call_timer,
res_col_ids));
local_state._build_side_rows += in_block->rows();
if (local_state._build_side_rows > std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported(
"Hash join do not support build table rows over: {}, you should enable "
"join spill to avoid this issue",
std::to_string(std::numeric_limits<uint32_t>::max()));
}
SCOPED_TIMER(local_state._build_side_merge_block_timer);
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge_ignore_overflow(
std::move(*in_block)));
int64_t blocks_mem_usage = local_state._build_side_mutable_block.allocated_bytes();
COUNTER_SET(local_state._memory_used_counter, blocks_mem_usage);
COUNTER_SET(local_state._build_blocks_memory_usage, blocks_mem_usage);
}
}
if (local_state._should_build_hash_table && eos) {
DCHECK(!local_state._build_side_mutable_block.empty());
local_state._shared_state->build_block = std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
state, local_state._shared_state->build_block->rows(),
local_state._finish_dependency));
RETURN_IF_ERROR(
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
local_state.init_short_circuit_for_probe();
} else if (!local_state._should_build_hash_table) {
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
// but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit
// return eof will make task marked as wake_up_early
// todo: remove signaled after we can guarantee that wake up eraly is always set accurately
if (!_signaled || local_state._terminated) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}
DCHECK_LE(local_state._task_idx,
local_state._shared_state->hash_table_variant_vector.size());
std::visit(
[](auto&& dst, auto&& src) {
if constexpr (!std::is_same_v<std::monostate, std::decay_t<decltype(dst)>> &&
std::is_same_v<std::decay_t<decltype(src)>,
std::decay_t<decltype(dst)>>) {
dst.hash_table = src.hash_table;
} else {
throw Exception(Status::InternalError(
"Hash table type mismatch when share hash table"));
}
},
local_state._shared_state->hash_table_variant_vector[local_state._task_idx]
->method_variant,
local_state._shared_state->hash_table_variant_vector.front()->method_variant);
}
if (eos) {
// If a shared hash table is used, states are shared by all tasks.
// Sink and source has n-n relationship If a shared hash table is used otherwise 1-1 relationship.
// So we should notify the `_task_idx` source task if a shared hash table is used.
local_state._dependency->set_ready_to_read(_use_shared_hash_table ? local_state._task_idx
: 0);
}
return Status::OK();
}
size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
auto& local_state = get_local_state(state);
return local_state.get_reserve_mem_size(state, eos);
}
size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return local_state._memory_used_counter->value();
}
std::string HashJoinBuildSinkOperatorX::get_memory_usage_debug_str(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return fmt::format("build block: {}, hash table: {}, build key arena: {}",
PrettyPrinter::print_bytes(local_state._build_blocks_memory_usage->value()),
PrettyPrinter::print_bytes(local_state._hash_table_memory_usage->value()),
PrettyPrinter::print_bytes(local_state._build_arena_memory_usage->value()));
}
} // namespace doris::pipeline