| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "hashjoin_probe_operator.h" |
| |
| #include <gen_cpp/PlanNodes_types.h> |
| |
| #include <string> |
| |
| #include "common/cast_set.h" |
| #include "common/logging.h" |
| #include "pipeline/exec/operator.h" |
| #include "runtime/descriptors.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/data_types/data_type_nullable.h" |
| |
| namespace doris::pipeline { |
| #include "common/compile_check_begin.h" |
| HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) |
| : JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>(state, parent), |
| _process_hashtable_ctx_variants(std::make_unique<HashTableCtxVariants>()) {} |
| |
| Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
| RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_init_timer); |
| _task_idx = info.task_idx; |
| auto& p = _parent->cast<HashJoinProbeOperatorX>(); |
| _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); |
| for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { |
| RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); |
| } |
| _other_join_conjuncts.resize(p._other_join_conjuncts.size()); |
| for (size_t i = 0; i < _other_join_conjuncts.size(); i++) { |
| RETURN_IF_ERROR(p._other_join_conjuncts[i]->clone(state, _other_join_conjuncts[i])); |
| } |
| |
| _mark_join_conjuncts.resize(p._mark_join_conjuncts.size()); |
| for (size_t i = 0; i < _mark_join_conjuncts.size(); i++) { |
| RETURN_IF_ERROR(p._mark_join_conjuncts[i]->clone(state, _mark_join_conjuncts[i])); |
| } |
| |
| _construct_mutable_join_block(); |
| _probe_column_disguise_null.reserve(_probe_expr_ctxs.size()); |
| _probe_arena_memory_usage = custom_profile()->AddHighWaterMarkCounter( |
| "MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1); |
| // Probe phase |
| _probe_expr_call_timer = ADD_TIMER(custom_profile(), "ProbeExprCallTime"); |
| _search_hashtable_timer = ADD_TIMER(custom_profile(), "ProbeWhenSearchHashTableTime"); |
| _build_side_output_timer = ADD_TIMER(custom_profile(), "ProbeWhenBuildSideOutputTime"); |
| _probe_side_output_timer = ADD_TIMER(custom_profile(), "ProbeWhenProbeSideOutputTime"); |
| _non_equal_join_conjuncts_timer = |
| ADD_TIMER(custom_profile(), "NonEqualJoinConjunctEvaluationTime"); |
| _init_probe_side_timer = ADD_TIMER(custom_profile(), "InitProbeSideTime"); |
| return Status::OK(); |
| } |
| |
| Status HashJoinProbeLocalState::open(RuntimeState* state) { |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_open_timer); |
| RETURN_IF_ERROR(JoinProbeLocalState::open(state)); |
| |
| auto& p = _parent->cast<HashJoinProbeOperatorX>(); |
| Status res; |
| std::visit( |
| [&](auto&& join_op_variants, auto have_other_join_conjunct) { |
| using JoinOpType = std::decay_t<decltype(join_op_variants)>; |
| if constexpr (JoinOpType::value == TJoinOp::CROSS_JOIN) { |
| res = Status::InternalError("hash join do not support cross join"); |
| } else { |
| _process_hashtable_ctx_variants |
| ->emplace<ProcessHashTableProbe<JoinOpType::value>>( |
| this, state->batch_size()); |
| } |
| }, |
| _shared_state->join_op_variants, |
| vectorized::make_bool_variant(p._have_other_join_conjunct)); |
| return res; |
| } |
| |
| void HashJoinProbeLocalState::prepare_for_next() { |
| _probe_index = 0; |
| _build_index = 0; |
| _ready_probe = false; |
| _last_probe_match = -1; |
| _last_probe_null_mark = -1; |
| _prepare_probe_block(); |
| } |
| |
| Status HashJoinProbeLocalState::close(RuntimeState* state) { |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_close_timer); |
| if (_closed) { |
| return Status::OK(); |
| } |
| if (_process_hashtable_ctx_variants) { |
| std::visit(vectorized::Overload {[&](std::monostate&) {}, |
| [&](auto&& process_hashtable_ctx) { |
| if (process_hashtable_ctx._arena) { |
| process_hashtable_ctx._arena.reset(); |
| } |
| }}, |
| *_process_hashtable_ctx_variants); |
| } |
| _process_hashtable_ctx_variants = nullptr; |
| _null_map_column = nullptr; |
| _probe_block.clear(); |
| return JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>::close(state); |
| } |
| |
| bool HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block, |
| const std::vector<int>& res_col_ids) { |
| for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { |
| const auto* column = block.get_by_position(res_col_ids[i]).column.get(); |
| if (column->is_nullable() && |
| !_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void HashJoinProbeLocalState::_prepare_probe_block() { |
| // clear_column_data of _probe_block |
| if (!_probe_column_disguise_null.empty()) { |
| for (int i = 0; i < _probe_column_disguise_null.size(); ++i) { |
| auto column_to_erase = _probe_column_disguise_null[i]; |
| _probe_block.erase(column_to_erase - i); |
| } |
| _probe_column_disguise_null.clear(); |
| } |
| |
| // remove add nullmap of probe columns |
| for (auto index : _probe_column_convert_to_null) { |
| auto& column_type = _probe_block.safe_get_by_position(index); |
| DCHECK(column_type.column->is_nullable() || is_column_const(*(column_type.column.get()))); |
| DCHECK(column_type.type->is_nullable()); |
| |
| column_type.column = remove_nullable(column_type.column); |
| column_type.type = remove_nullable(column_type.type); |
| } |
| _key_columns_holder.clear(); |
| _probe_block.clear_column_data(_parent->get_child()->row_desc().num_materialized_slots()); |
| } |
| |
| HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, |
| int operator_id, const DescriptorTbl& descs) |
| : JoinProbeOperatorX<HashJoinProbeLocalState>(pool, tnode, operator_id, descs), |
| _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type |
| : TJoinDistributionType::NONE), |
| _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && |
| tnode.hash_join_node.is_broadcast_join), |
| _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids |
| ? tnode.hash_join_node.hash_output_slot_ids |
| : std::vector<SlotId> {}), |
| _partition_exprs(tnode.__isset.distribute_expr_lists && !_is_broadcast_join |
| ? tnode.distribute_expr_lists[0] |
| : std::vector<TExpr> {}) {} |
| |
| Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, |
| bool* eos) const { |
| auto& local_state = get_local_state(state); |
| if (local_state._shared_state->short_circuit_for_probe) { |
| // If we use a short-circuit strategy, should return empty block directly. |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| //TODO: this short circuit maybe could refactor, no need to check at here. |
| if (local_state.empty_right_table_shortcut()) { |
| // when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN |
| // we could get the result is probe table + null-column(if need output) |
| // If we use a short-circuit strategy, should return block directly by add additional null data. |
| auto block_rows = local_state._probe_block.rows(); |
| if (local_state._probe_eos && block_rows == 0) { |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| //create build side null column, if need output |
| for (int i = 0; |
| (_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) { |
| auto type = remove_nullable(_right_table_data_types[i]); |
| auto column = type->create_column(); |
| column->resize(block_rows); |
| auto null_map_column = vectorized::ColumnUInt8::create(block_rows, 1); |
| auto nullable_column = vectorized::ColumnNullable::create(std::move(column), |
| std::move(null_map_column)); |
| local_state._probe_block.insert({std::move(nullable_column), make_nullable(type), |
| _right_table_column_names[i]}); |
| } |
| |
| /// No need to check the block size in `_filter_data_and_build_output` because here dose not |
| /// increase the output rows count(just same as `_probe_block`'s rows count). |
| RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos, |
| &local_state._probe_block, false)); |
| local_state._probe_block.clear_column_data(_child->row_desc().num_materialized_slots()); |
| return Status::OK(); |
| } |
| |
| local_state._join_block.clear_column_data(); |
| |
| vectorized::MutableBlock mutable_join_block(&local_state._join_block); |
| vectorized::Block temp_block; |
| |
| Status st; |
| if (local_state._probe_index < local_state._probe_block.rows()) { |
| DCHECK(local_state._has_set_need_null_map_for_probe); |
| std::visit( |
| [&](auto&& arg, auto&& process_hashtable_ctx) { |
| using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>; |
| if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) { |
| using HashTableCtxType = std::decay_t<decltype(arg)>; |
| if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { |
| st = process_hashtable_ctx.process( |
| arg, |
| local_state._null_map_column |
| ? local_state._null_map_column->get_data().data() |
| : nullptr, |
| mutable_join_block, &temp_block, |
| cast_set<uint32_t>(local_state._probe_block.rows()), |
| _is_mark_join); |
| } else { |
| st = Status::InternalError("uninited hash table"); |
| } |
| } else { |
| st = Status::InternalError("uninited hash table probe"); |
| } |
| }, |
| local_state._shared_state->hash_table_variant_vector.size() == 1 |
| ? local_state._shared_state->hash_table_variant_vector[0]->method_variant |
| : local_state._shared_state |
| ->hash_table_variant_vector[local_state._task_idx] |
| ->method_variant, |
| *local_state._process_hashtable_ctx_variants); |
| } else if (local_state._probe_eos) { |
| if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { |
| std::visit( |
| [&](auto&& arg, auto&& process_hashtable_ctx) { |
| using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>; |
| if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) { |
| using HashTableCtxType = std::decay_t<decltype(arg)>; |
| if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { |
| st = process_hashtable_ctx.finish_probing( |
| arg, mutable_join_block, &temp_block, eos, _is_mark_join); |
| } else { |
| st = Status::InternalError("uninited hash table"); |
| } |
| } else { |
| st = Status::InternalError("uninited hash table probe"); |
| } |
| }, |
| local_state._shared_state->hash_table_variant_vector.size() == 1 |
| ? local_state._shared_state->hash_table_variant_vector[0] |
| ->method_variant |
| : local_state._shared_state |
| ->hash_table_variant_vector[local_state._task_idx] |
| ->method_variant, |
| *local_state._process_hashtable_ctx_variants); |
| } else { |
| *eos = true; |
| return Status::OK(); |
| } |
| } else { |
| return Status::OK(); |
| } |
| if (!st) { |
| return st; |
| } |
| |
| local_state._estimate_memory_usage += temp_block.allocated_bytes(); |
| RETURN_IF_ERROR( |
| local_state.filter_data_and_build_output(state, output_block, eos, &temp_block)); |
| // Here make _join_block release the columns' ptr |
| local_state._join_block.set_columns(local_state._join_block.clone_empty_columns()); |
| mutable_join_block.clear(); |
| return Status::OK(); |
| } |
| |
| std::string HashJoinProbeLocalState::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}", |
| JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>::debug_string( |
| indentation_level), |
| _shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL"); |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block, |
| const std::vector<int>& res_col_ids) { |
| if (empty_right_table_shortcut()) { |
| return Status::OK(); |
| } |
| |
| _probe_columns.resize(_probe_expr_ctxs.size()); |
| |
| if (!_has_set_need_null_map_for_probe) { |
| _has_set_need_null_map_for_probe = true; |
| _need_null_map_for_probe = _need_probe_null_map(block, res_col_ids); |
| } |
| if (_need_null_map_for_probe) { |
| if (!_null_map_column) { |
| _null_map_column = vectorized::ColumnUInt8::create(); |
| } |
| _null_map_column->get_data().assign(block.rows(), (uint8_t)0); |
| } |
| |
| auto& shared_state = *_shared_state; |
| for (size_t i = 0; i < shared_state.build_exprs_size; ++i) { |
| const auto* column = block.get_by_position(res_col_ids[i]).column.get(); |
| if (!column->is_nullable() && |
| _parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) { |
| _key_columns_holder.emplace_back( |
| vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column)); |
| _probe_columns[i] = _key_columns_holder.back().get(); |
| } else if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column); |
| nullable && |
| !_parent->cast<HashJoinProbeOperatorX>()._serialize_null_into_key[i]) { |
| // update nulllmap and split nested out of ColumnNullable when serialize_null_into_key is false and column is nullable |
| const auto& col_nested = nullable->get_nested_column(); |
| const auto& col_nullmap = nullable->get_null_map_data(); |
| DCHECK(_null_map_column); |
| vectorized::VectorizedUtils::update_null_map(_null_map_column->get_data(), col_nullmap); |
| _probe_columns[i] = &col_nested; |
| } else { |
| _probe_columns[i] = column; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| std::vector<uint16_t> HashJoinProbeLocalState::_convert_block_to_null(vectorized::Block& block) { |
| std::vector<uint16_t> results; |
| for (int i = 0; i < block.columns(); ++i) { |
| if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) { |
| DCHECK(!column_type.column->is_nullable()); |
| column_type.column = make_nullable(column_type.column); |
| column_type.type = make_nullable(column_type.type); |
| results.emplace_back(i); |
| } |
| } |
| return results; |
| } |
| |
| Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state, |
| vectorized::Block* output_block, |
| bool* eos, |
| vectorized::Block* temp_block, |
| bool check_rows_count) { |
| auto output_rows = temp_block->rows(); |
| if (check_rows_count) { |
| DCHECK(output_rows <= state->batch_size()); |
| } |
| { |
| SCOPED_TIMER(_join_filter_timer); |
| RETURN_IF_ERROR(filter_block(_conjuncts, temp_block, temp_block->columns())); |
| } |
| |
| RETURN_IF_ERROR(_build_output_block(temp_block, output_block)); |
| reached_limit(output_block, eos); |
| return Status::OK(); |
| } |
| |
| bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { |
| auto& local_state = state->get_local_state(operator_id())->cast<HashJoinProbeLocalState>(); |
| return (local_state._probe_block.rows() == 0 || |
| local_state._probe_index == local_state._probe_block.rows()) && |
| !local_state._probe_eos && !local_state._shared_state->short_circuit_for_probe; |
| } |
| |
| Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block, |
| vectorized::VExprContextSPtrs& exprs, |
| RuntimeProfile::Counter& expr_call_timer, |
| std::vector<int>& res_col_ids) const { |
| for (size_t i = 0; i < exprs.size(); ++i) { |
| int result_col_id = -1; |
| // execute build column |
| { |
| SCOPED_TIMER(&expr_call_timer); |
| RETURN_IF_ERROR(exprs[i]->execute(&block, &result_col_id)); |
| } |
| |
| // TODO: opt the column is const |
| block.get_by_position(result_col_id).column = |
| block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); |
| res_col_ids[i] = result_col_id; |
| } |
| return Status::OK(); |
| } |
| |
| Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block, |
| bool eos) const { |
| auto& local_state = get_local_state(state); |
| local_state.prepare_for_next(); |
| local_state._probe_eos = eos; |
| |
| const auto rows = input_block->rows(); |
| size_t origin_size = input_block->allocated_bytes(); |
| |
| if (rows > 0) { |
| COUNTER_UPDATE(local_state._probe_rows_counter, rows); |
| std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size()); |
| RETURN_IF_ERROR(_do_evaluate(*input_block, local_state._probe_expr_ctxs, |
| *local_state._probe_expr_call_timer, res_col_ids)); |
| if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { |
| local_state._probe_column_convert_to_null = |
| local_state._convert_block_to_null(*input_block); |
| } |
| |
| RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids)); |
| |
| local_state._estimate_memory_usage += (input_block->allocated_bytes() - origin_size); |
| |
| if (&local_state._probe_block != input_block) { |
| input_block->swap(local_state._probe_block); |
| COUNTER_SET(local_state._memory_used_counter, |
| (int64_t)local_state._probe_block.allocated_bytes()); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { |
| RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::init(tnode, state)); |
| DCHECK(tnode.__isset.hash_join_node); |
| const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; |
| for (const auto& eq_join_conjunct : eq_join_conjuncts) { |
| vectorized::VExprContextSPtr ctx; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, ctx)); |
| _probe_expr_ctxs.push_back(ctx); |
| |
| /// null safe equal means null = null is true, the operator in SQL should be: <=>. |
| const bool is_null_safe_equal = |
| eq_join_conjunct.__isset.opcode && |
| (eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL) && |
| // For a null safe equal join, FE may generate a plan that |
| // both sides of the conjuct are not nullable, we just treat it |
| // as a normal equal join conjunct. |
| (eq_join_conjunct.right.nodes[0].is_nullable || |
| eq_join_conjunct.left.nodes[0].is_nullable); |
| |
| if (eq_join_conjuncts.size() == 1) { |
| // single column key serialize method must use nullmap for represent null to instead serialize null into key |
| _serialize_null_into_key.emplace_back(false); |
| } else if (is_null_safe_equal) { |
| // use serialize null into key to represent multi column null value |
| _serialize_null_into_key.emplace_back(true); |
| } else { |
| // on normal conditions, because null!=null, it can be expressed directly with nullmap. |
| _serialize_null_into_key.emplace_back(false); |
| } |
| } |
| |
| if (tnode.hash_join_node.__isset.other_join_conjuncts && |
| !tnode.hash_join_node.other_join_conjuncts.empty()) { |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( |
| tnode.hash_join_node.other_join_conjuncts, _other_join_conjuncts)); |
| |
| DCHECK(!_build_unique); |
| DCHECK(_have_other_join_conjunct); |
| } else if (tnode.hash_join_node.__isset.vother_join_conjunct) { |
| _other_join_conjuncts.resize(1); |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( |
| tnode.hash_join_node.vother_join_conjunct, _other_join_conjuncts[0])); |
| |
| // If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate, |
| // build table should not be deduplicated. |
| DCHECK(!_build_unique); |
| DCHECK(_have_other_join_conjunct); |
| } |
| |
| if (tnode.hash_join_node.__isset.mark_join_conjuncts && |
| !tnode.hash_join_node.mark_join_conjuncts.empty()) { |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( |
| tnode.hash_join_node.mark_join_conjuncts, _mark_join_conjuncts)); |
| DCHECK(_is_mark_join); |
| |
| /// We make mark join conjuncts as equal conjuncts for null aware join, |
| /// so `_mark_join_conjuncts` should be empty if this is null aware join. |
| DCHECK_EQ(_mark_join_conjuncts.empty(), |
| _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || |
| _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state)); |
| // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need |
| // insert to output block of hash join. |
| // _left_output_slots_flags : column of left table need to output set flag = true |
| // _rgiht_output_slots_flags : column of right table need to output set flag = true |
| // if _hash_output_slot_ids is empty, means all column of left/right table need to output. |
| auto init_output_slots_flags = [&](auto& tuple_descs, auto& output_slot_flags, |
| bool init_finalize_flag = false) { |
| for (const auto& tuple_desc : tuple_descs) { |
| for (const auto& slot_desc : tuple_desc->slots()) { |
| output_slot_flags.emplace_back( |
| std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(), |
| slot_desc->id()) != _hash_output_slot_ids.end()); |
| if (init_finalize_flag && output_slot_flags.back() && |
| slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
| _need_finalize_variant_column = true; |
| } |
| } |
| } |
| }; |
| init_output_slots_flags(_child->row_desc().tuple_descriptors(), _left_output_slot_flags, true); |
| init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(), |
| _right_output_slot_flags); |
| // _other_join_conjuncts are evaluated in the context of the rows produced by this node |
| for (auto& conjunct : _other_join_conjuncts) { |
| RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); |
| conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids); |
| } |
| |
| for (auto& conjunct : _mark_join_conjuncts) { |
| RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); |
| conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids); |
| } |
| |
| RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); |
| DCHECK(_build_side_child != nullptr); |
| // right table data types |
| _right_table_data_types = |
| vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc()); |
| _left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child->row_desc()); |
| _right_table_column_names = |
| vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc()); |
| |
| std::vector<const SlotDescriptor*> slots_to_check; |
| for (const auto& tuple_descriptor : _intermediate_row_desc->tuple_descriptors()) { |
| for (const auto& slot : tuple_descriptor->slots()) { |
| slots_to_check.emplace_back(slot); |
| } |
| } |
| |
| if (_is_mark_join) { |
| const auto* last_one = slots_to_check.back(); |
| slots_to_check.pop_back(); |
| auto data_type = last_one->get_data_type_ptr(); |
| if (!data_type->is_nullable()) { |
| return Status::InternalError( |
| "The last column for mark join should be Nullable(UInt8), not {}", |
| data_type->get_name()); |
| } |
| |
| const auto& null_data_type = assert_cast<const vectorized::DataTypeNullable&>(*data_type); |
| if (null_data_type.get_nested_type()->get_primitive_type() != PrimitiveType::TYPE_BOOLEAN) { |
| return Status::InternalError( |
| "The last column for mark join should be Nullable(UInt8), not {}", |
| data_type->get_name()); |
| } |
| } |
| |
| _right_col_idx = (_is_right_semi_anti && !_have_other_join_conjunct && |
| (!_is_mark_join || _mark_join_conjuncts.empty())) |
| ? 0 |
| : _left_table_data_types.size(); |
| |
| size_t idx = 0; |
| for (const auto* slot : slots_to_check) { |
| auto data_type = slot->get_data_type_ptr(); |
| const auto slot_on_left = idx < _right_col_idx; |
| |
| if (slot_on_left) { |
| if (idx >= _left_table_data_types.size()) { |
| return Status::InternalError( |
| "Join node(id={}, OP={}) intermediate slot({}, #{})'s on left table " |
| "idx out bound of _left_table_data_types: {} vs {}", |
| _node_id, _join_op, slot->col_name(), slot->id(), idx, |
| _left_table_data_types.size()); |
| } |
| } else if (idx - _right_col_idx >= _right_table_data_types.size()) { |
| return Status::InternalError( |
| "Join node(id={}, OP={}) intermediate slot({}, #{})'s on right table " |
| "idx out bound of _right_table_data_types: {} vs {}(idx = {}, _right_col_idx = " |
| "{})", |
| _node_id, _join_op, slot->col_name(), slot->id(), idx - _right_col_idx, |
| _right_table_data_types.size(), idx, _right_col_idx); |
| } |
| |
| auto target_data_type = slot_on_left ? _left_table_data_types[idx] |
| : _right_table_data_types[idx - _right_col_idx]; |
| ++idx; |
| if (data_type->equals(*target_data_type)) { |
| continue; |
| } |
| |
| /// For outer join(left/right/full), the non-nullable columns may be converted to nullable. |
| const auto accept_nullable_not_match = |
| _join_op == TJoinOp::FULL_OUTER_JOIN || |
| (slot_on_left ? _join_op == TJoinOp::RIGHT_OUTER_JOIN |
| : _join_op == TJoinOp::LEFT_OUTER_JOIN); |
| |
| if (accept_nullable_not_match) { |
| auto data_type_non_nullable = vectorized::remove_nullable(data_type); |
| if (data_type_non_nullable->equals(*target_data_type)) { |
| continue; |
| } |
| } else if (data_type->equals(*target_data_type)) { |
| continue; |
| } |
| |
| return Status::InternalError( |
| "Join node(id={}, OP={}) intermediate slot({}, #{})'s on {} table data type not " |
| "match: '{}' vs '{}'", |
| _node_id, _join_op, slot->col_name(), slot->id(), (slot_on_left ? "left" : "right"), |
| data_type->get_name(), target_data_type->get_name()); |
| } |
| |
| _build_side_child.reset(); |
| RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); |
| for (auto& conjunct : _other_join_conjuncts) { |
| RETURN_IF_ERROR(conjunct->open(state)); |
| } |
| |
| for (auto& conjunct : _mark_join_conjuncts) { |
| RETURN_IF_ERROR(conjunct->open(state)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| } // namespace doris::pipeline |