| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include "exec/operator/join_build_sink_operator.h" |
| #include "exec/operator/operator.h" |
| #include "exec/runtime_filter/runtime_filter_producer_helper.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class HashJoinBuildSinkOperatorX; |
| |
| class HashJoinBuildSinkLocalState MOCK_REMOVE(final) |
| : public JoinBuildSinkLocalState<HashJoinSharedState, HashJoinBuildSinkLocalState> { |
| public: |
| ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState); |
| using Base = JoinBuildSinkLocalState<HashJoinSharedState, HashJoinBuildSinkLocalState>; |
| using Parent = HashJoinBuildSinkOperatorX; |
| HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); |
| ~HashJoinBuildSinkLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override; |
| Status open(RuntimeState* state) override; |
| Status terminate(RuntimeState* state) override; |
| Status process_build_block(RuntimeState* state, Block& block); |
| |
| // Build ASOF JOIN pre-sorted index for O(log K) lookup |
| Status build_asof_index(Block& block); |
| |
| void init_short_circuit_for_probe(); |
| |
| bool build_unique() const; |
| |
| Dependency* finishdependency() override { return _finish_dependency.get(); } |
| |
| Status close(RuntimeState* state, Status exec_status) override; |
| |
| [[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* state, bool eos); |
| |
| protected: |
| Status _hash_table_init(RuntimeState* state, const ColumnRawPtrs& raw_ptrs); |
| void _set_build_side_has_external_nullmap(Block& block, const std::vector<int>& res_col_ids); |
| Status _do_evaluate(Block& block, VExprContextSPtrs& exprs, |
| RuntimeProfile::Counter& expr_call_timer, std::vector<int>& res_col_ids); |
| std::vector<uint16_t> _convert_block_to_null(Block& block); |
| Status _extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map, |
| ColumnRawPtrs& raw_ptrs, const std::vector<int>& res_col_ids); |
| friend class HashJoinBuildSinkOperatorX; |
| friend class PartitionedHashJoinSinkLocalState; |
| template <class HashTableContext> |
| friend struct ProcessHashTableBuild; |
| |
| // build expr |
| VExprContextSPtrs _build_expr_ctxs; |
| std::vector<ColumnPtr> _key_columns_holder; |
| |
| bool _should_build_hash_table = true; |
| |
| size_t _evaluate_mem_usage = 0; |
| size_t _build_side_rows = 0; |
| int _task_idx; |
| |
| MutableBlock _build_side_mutable_block; |
| std::shared_ptr<RuntimeFilterProducerHelper> _runtime_filter_producer_helper; |
| |
| /* |
| * The comparison result of a null value with any other value is null, |
| * which means that for most join(exclude: null aware join, null equal safe join), |
| * the result of an equality condition involving null should be false, |
| * so null does not need to be added to the hash table. |
| */ |
| bool _build_side_has_external_nullmap = false; |
| std::vector<int> _build_col_ids; |
| std::shared_ptr<CountedFinishDependency> _finish_dependency; |
| |
| RuntimeProfile::Counter* _build_table_timer = nullptr; |
| RuntimeProfile::Counter* _build_expr_call_timer = nullptr; |
| RuntimeProfile::Counter* _build_table_insert_timer = nullptr; |
| RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr; |
| |
| RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr; |
| RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; |
| RuntimeProfile::Counter* _build_arena_memory_usage = nullptr; |
| |
| // ASOF index build counters |
| RuntimeProfile::Counter* _asof_index_total_timer = nullptr; |
| RuntimeProfile::Counter* _asof_index_expr_timer = nullptr; |
| RuntimeProfile::Counter* _asof_index_sort_timer = nullptr; |
| RuntimeProfile::Counter* _asof_index_group_timer = nullptr; |
| }; |
| |
| class HashJoinBuildSinkOperatorX MOCK_REMOVE(final) |
| : public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> { |
| public: |
| HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, |
| const TPlanNode& tnode, const DescriptorTbl& descs); |
| Status init(const TDataSink& tsink) override { |
| return Status::InternalError("{} should not init with TDataSink", |
| JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::_name); |
| } |
| |
| Status init(const TPlanNode& tnode, RuntimeState* state) override; |
| |
| Status prepare(RuntimeState* state) override; |
| |
| Status sink(RuntimeState* state, Block* in_block, bool eos) override; |
| |
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; |
| |
| [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const; |
| |
| MOCK_FUNCTION std::string get_memory_usage_debug_str(RuntimeState* state) const; |
| |
| bool should_dry_run(RuntimeState* state) override { |
| return _is_broadcast_join && !state->get_sink_local_state() |
| ->cast<HashJoinBuildSinkLocalState>() |
| ._should_build_hash_table; |
| } |
| |
| DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { |
| if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| return {ExchangeType::NOOP}; |
| } else if (_is_broadcast_join) { |
| return _child->is_serial_operator() ? DataDistribution(ExchangeType::PASS_TO_ONE) |
| : DataDistribution(ExchangeType::NOOP); |
| } |
| return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || |
| _join_distribution == TJoinDistributionType::COLOCATE |
| ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) |
| : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); |
| } |
| |
| bool is_shuffled_operator() const override { |
| return _join_distribution == TJoinDistributionType::PARTITIONED || |
| _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || |
| _join_distribution == TJoinDistributionType::COLOCATE; |
| } |
| bool is_colocated_operator() const override { |
| return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || |
| _join_distribution == TJoinDistributionType::COLOCATE; |
| } |
| bool followed_by_shuffled_operator() const override { |
| return (is_shuffled_operator() && !is_colocated_operator()) || |
| _followed_by_shuffled_operator; |
| } |
| std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; } |
| |
| bool allow_left_semi_direct_return(RuntimeState* state) const { |
| // only single join conjunct and left semi join can direct return |
| return _join_op == TJoinOp::LEFT_SEMI_JOIN && _build_expr_ctxs.size() == 1 && |
| !_have_other_join_conjunct && !_is_mark_join && |
| state->query_options().__isset.enable_left_semi_direct_return_opt && |
| state->query_options().enable_left_semi_direct_return_opt; |
| } |
| |
| private: |
| friend class HashJoinBuildSinkLocalState; |
| |
| const TJoinDistributionType::type _join_distribution; |
| // build expr |
| VExprContextSPtrs _build_expr_ctxs; |
| // mark the build hash table whether it needs to store null value |
| std::vector<bool> _serialize_null_into_key; |
| |
| // mark the join column whether support null eq |
| std::vector<bool> _is_null_safe_eq_join; |
| |
| bool _is_broadcast_join = false; |
| std::vector<TExpr> _partition_exprs; |
| |
| std::vector<SlotId> _hash_output_slot_ids; |
| std::vector<bool> _should_keep_column_flags; |
| bool _should_keep_hash_key_column = false; |
| // if build side has variant column and need output variant column |
| // need to finalize variant column to speed up the join op |
| bool _need_finalize_variant_column = false; |
| |
| // ASOF JOIN: build-side expression extracted from MATCH_CONDITION's right child |
| // Prepared against build child's row_desc directly (no intermediate tuple needed) |
| VExprContextSPtr _asof_build_side_expr; |
| TExprOpcode::type _asof_opcode = TExprOpcode::INVALID_OPCODE; |
| |
| bool _use_shared_hash_table = false; |
| std::atomic<bool> _signaled = false; |
| std::mutex _mutex; |
| std::vector<std::shared_ptr<Dependency>> _finish_dependencies; |
| std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters; |
| }; |
| |
| template <class HashTableContext> |
| struct ProcessHashTableBuild { |
| ProcessHashTableBuild(uint32_t rows, ColumnRawPtrs& build_raw_ptrs, |
| HashJoinBuildSinkLocalState* parent, int batch_size, RuntimeState* state) |
| : _rows(rows), |
| _build_raw_ptrs(build_raw_ptrs), |
| _parent(parent), |
| _batch_size(batch_size), |
| _state(state) {} |
| |
| template <int JoinOpType> |
| Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key, |
| bool short_circuit_for_null, bool with_other_conjuncts) { |
| if (null_map) { |
| // first row is mocked and is null |
| if (simd::contain_one(null_map->data() + 1, _rows - 1)) { |
| *has_null_key = true; |
| } |
| if (short_circuit_for_null && *has_null_key) { |
| return Status::OK(); |
| } |
| } |
| |
| SCOPED_TIMER(_parent->_build_table_insert_timer); |
| hash_table_ctx.hash_table->template prepare_build<JoinOpType>( |
| _rows, _batch_size, *has_null_key, hash_table_ctx.direct_mapping_range()); |
| |
| // In order to make the null keys equal when using single null eq, all null keys need to be set to default value. |
| if (_build_raw_ptrs.size() == 1 && null_map && *has_null_key) { |
| _build_raw_ptrs[0]->assume_mutable()->replace_column_null_data(null_map->data()); |
| } |
| |
| hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, |
| null_map ? null_map->data() : nullptr, true, true, |
| hash_table_ctx.hash_table->get_bucket_size()); |
| // only 2 cases need to access the null value in hash table |
| bool keep_null_key = false; |
| if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || |
| JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && |
| with_other_conjuncts) { |
| // null aware join with other conjuncts |
| keep_null_key = true; |
| } else if (_parent->parent() |
| ->cast<HashJoinBuildSinkOperatorX>() |
| .is_null_safe_eq_join() |
| .size() == 1 && |
| _parent->parent() |
| ->cast<HashJoinBuildSinkOperatorX>() |
| .is_null_safe_eq_join()[0]) { |
| // single null safe eq |
| keep_null_key = true; |
| } |
| |
| hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), |
| _rows, keep_null_key); |
| hash_table_ctx.bucket_nums.resize(_batch_size); |
| hash_table_ctx.bucket_nums.shrink_to_fit(); |
| |
| COUNTER_SET(_parent->_hash_table_memory_usage, |
| (int64_t)hash_table_ctx.hash_table->get_byte_size()); |
| COUNTER_SET(_parent->_build_arena_memory_usage, |
| (int64_t)hash_table_ctx.serialized_keys_size(true)); |
| return Status::OK(); |
| } |
| |
| private: |
| const uint32_t _rows; |
| ColumnRawPtrs& _build_raw_ptrs; |
| HashJoinBuildSinkLocalState* _parent = nullptr; |
| int _batch_size; |
| RuntimeState* _state = nullptr; |
| }; |
| |
| } // namespace doris |
| #include "common/compile_check_end.h" |