blob: c6f492e8df743c7e8a8f5af268972acd0b43d104 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "exec/operator/join_build_sink_operator.h"
#include "exec/operator/operator.h"
#include "exec/runtime_filter/runtime_filter_producer_helper.h"
namespace doris {
#include "common/compile_check_begin.h"
class HashJoinBuildSinkOperatorX;
class HashJoinBuildSinkLocalState MOCK_REMOVE(final)
: public JoinBuildSinkLocalState<HashJoinSharedState, HashJoinBuildSinkLocalState> {
public:
ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
using Base = JoinBuildSinkLocalState<HashJoinSharedState, HashJoinBuildSinkLocalState>;
using Parent = HashJoinBuildSinkOperatorX;
HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
~HashJoinBuildSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status terminate(RuntimeState* state) override;
Status process_build_block(RuntimeState* state, Block& block);
// Build ASOF JOIN pre-sorted index for O(log K) lookup
Status build_asof_index(Block& block);
void init_short_circuit_for_probe();
bool build_unique() const;
Dependency* finishdependency() override { return _finish_dependency.get(); }
Status close(RuntimeState* state, Status exec_status) override;
[[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* state, bool eos);
protected:
Status _hash_table_init(RuntimeState* state, const ColumnRawPtrs& raw_ptrs);
void _set_build_side_has_external_nullmap(Block& block, const std::vector<int>& res_col_ids);
Status _do_evaluate(Block& block, VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer, std::vector<int>& res_col_ids);
std::vector<uint16_t> _convert_block_to_null(Block& block);
Status _extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map,
ColumnRawPtrs& raw_ptrs, const std::vector<int>& res_col_ids);
friend class HashJoinBuildSinkOperatorX;
friend class PartitionedHashJoinSinkLocalState;
template <class HashTableContext>
friend struct ProcessHashTableBuild;
// build expr
VExprContextSPtrs _build_expr_ctxs;
std::vector<ColumnPtr> _key_columns_holder;
bool _should_build_hash_table = true;
size_t _evaluate_mem_usage = 0;
size_t _build_side_rows = 0;
int _task_idx;
MutableBlock _build_side_mutable_block;
std::shared_ptr<RuntimeFilterProducerHelper> _runtime_filter_producer_helper;
/*
* The comparison result of a null value with any other value is null,
* which means that for most join(exclude: null aware join, null equal safe join),
* the result of an equality condition involving null should be false,
* so null does not need to be added to the hash table.
*/
bool _build_side_has_external_nullmap = false;
std::vector<int> _build_col_ids;
std::shared_ptr<CountedFinishDependency> _finish_dependency;
RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _build_arena_memory_usage = nullptr;
// ASOF index build counters
RuntimeProfile::Counter* _asof_index_total_timer = nullptr;
RuntimeProfile::Counter* _asof_index_expr_timer = nullptr;
RuntimeProfile::Counter* _asof_index_sort_timer = nullptr;
RuntimeProfile::Counter* _asof_index_group_timer = nullptr;
};
class HashJoinBuildSinkOperatorX MOCK_REMOVE(final)
: public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> {
public:
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::_name);
}
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
[[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;
MOCK_FUNCTION std::string get_memory_usage_debug_str(RuntimeState* state) const;
bool should_dry_run(RuntimeState* state) override {
return _is_broadcast_join && !state->get_sink_local_state()
->cast<HashJoinBuildSinkLocalState>()
._should_build_hash_table;
}
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
return _child->is_serial_operator() ? DataDistribution(ExchangeType::PASS_TO_ONE)
: DataDistribution(ExchangeType::NOOP);
}
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE;
}
bool is_colocated_operator() const override {
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE;
}
bool followed_by_shuffled_operator() const override {
return (is_shuffled_operator() && !is_colocated_operator()) ||
_followed_by_shuffled_operator;
}
std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
bool allow_left_semi_direct_return(RuntimeState* state) const {
// only single join conjunct and left semi join can direct return
return _join_op == TJoinOp::LEFT_SEMI_JOIN && _build_expr_ctxs.size() == 1 &&
!_have_other_join_conjunct && !_is_mark_join &&
state->query_options().__isset.enable_left_semi_direct_return_opt &&
state->query_options().enable_left_semi_direct_return_opt;
}
private:
friend class HashJoinBuildSinkLocalState;
const TJoinDistributionType::type _join_distribution;
// build expr
VExprContextSPtrs _build_expr_ctxs;
// mark the build hash table whether it needs to store null value
std::vector<bool> _serialize_null_into_key;
// mark the join column whether support null eq
std::vector<bool> _is_null_safe_eq_join;
bool _is_broadcast_join = false;
std::vector<TExpr> _partition_exprs;
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _should_keep_column_flags;
bool _should_keep_hash_key_column = false;
// if build side has variant column and need output variant column
// need to finalize variant column to speed up the join op
bool _need_finalize_variant_column = false;
// ASOF JOIN: build-side expression extracted from MATCH_CONDITION's right child
// Prepared against build child's row_desc directly (no intermediate tuple needed)
VExprContextSPtr _asof_build_side_expr;
TExprOpcode::type _asof_opcode = TExprOpcode::INVALID_OPCODE;
bool _use_shared_hash_table = false;
std::atomic<bool> _signaled = false;
std::mutex _mutex;
std::vector<std::shared_ptr<Dependency>> _finish_dependencies;
std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters;
};
template <class HashTableContext>
struct ProcessHashTableBuild {
ProcessHashTableBuild(uint32_t rows, ColumnRawPtrs& build_raw_ptrs,
HashJoinBuildSinkLocalState* parent, int batch_size, RuntimeState* state)
: _rows(rows),
_build_raw_ptrs(build_raw_ptrs),
_parent(parent),
_batch_size(batch_size),
_state(state) {}
template <int JoinOpType>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key,
bool short_circuit_for_null, bool with_other_conjuncts) {
if (null_map) {
// first row is mocked and is null
if (simd::contain_one(null_map->data() + 1, _rows - 1)) {
*has_null_key = true;
}
if (short_circuit_for_null && *has_null_key) {
return Status::OK();
}
}
SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->template prepare_build<JoinOpType>(
_rows, _batch_size, *has_null_key, hash_table_ctx.direct_mapping_range());
// In order to make the null keys equal when using single null eq, all null keys need to be set to default value.
if (_build_raw_ptrs.size() == 1 && null_map && *has_null_key) {
_build_raw_ptrs[0]->assume_mutable()->replace_column_null_data(null_map->data());
}
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
null_map ? null_map->data() : nullptr, true, true,
hash_table_ctx.hash_table->get_bucket_size());
// only 2 cases need to access the null value in hash table
bool keep_null_key = false;
if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
with_other_conjuncts) {
// null aware join with other conjuncts
keep_null_key = true;
} else if (_parent->parent()
->cast<HashJoinBuildSinkOperatorX>()
.is_null_safe_eq_join()
.size() == 1 &&
_parent->parent()
->cast<HashJoinBuildSinkOperatorX>()
.is_null_safe_eq_join()[0]) {
// single null safe eq
keep_null_key = true;
}
hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(),
_rows, keep_null_key);
hash_table_ctx.bucket_nums.resize(_batch_size);
hash_table_ctx.bucket_nums.shrink_to_fit();
COUNTER_SET(_parent->_hash_table_memory_usage,
(int64_t)hash_table_ctx.hash_table->get_byte_size());
COUNTER_SET(_parent->_build_arena_memory_usage,
(int64_t)hash_table_ctx.serialized_keys_size(true));
return Status::OK();
}
private:
const uint32_t _rows;
ColumnRawPtrs& _build_raw_ptrs;
HashJoinBuildSinkLocalState* _parent = nullptr;
int _batch_size;
RuntimeState* _state = nullptr;
};
} // namespace doris
#include "common/compile_check_end.h"