blob: b0a0868566eeee136dce22e78d78adddc68f4c3c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "exec/operator/operator.h"
#include "exec/runtime_filter/runtime_filter_producer_helper_set.h"
namespace doris {
#include "common/compile_check_begin.h"
template <class HashTableContext, bool is_intersected>
struct HashTableBuild;
template <bool is_intersect>
class SetSinkOperatorX;
template <bool is_intersect>
class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {
public:
ENABLE_FACTORY_CREATOR(SetSinkLocalState);
using Base = PipelineXSinkLocalState<SetSharedState>;
using Parent = SetSinkOperatorX<is_intersect>;
SetSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {
_finish_dependency = std::make_shared<CountedFinishDependency>(
parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY");
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status terminate(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
private:
friend class SetSinkOperatorX<is_intersect>;
MutableBlock _mutable_block;
// every child has its result expr list
VExprContextSPtrs _child_exprs;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _hash_table_size = nullptr;
RuntimeProfile::Counter* _valid_element_in_hash_table = nullptr;
std::shared_ptr<RuntimeFilterProducerHelperSet> _runtime_filter_producer_helper;
std::shared_ptr<CountedFinishDependency> _finish_dependency;
};
template <bool is_intersect>
class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_intersect>> {
public:
using Base = DataSinkOperatorX<SetSinkLocalState<is_intersect>>;
using DataSinkOperatorXBase::operator_id;
using Base::get_local_state;
using typename Base::LocalState;
friend class SetSinkLocalState<is_intersect>;
SetSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool,
const TPlanNode& tnode, const DescriptorTbl& descs)
: Base(sink_id, tnode.node_id, dest_id),
_child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE
? tnode.intersect_node.result_expr_lists.size()
: tnode.except_node.result_expr_lists.size()),
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(tnode.__isset.distribute_expr_lists
? tnode.distribute_expr_lists[child_id]
: (is_intersect
? tnode.intersect_node.result_expr_lists[child_id]
: tnode.except_node.result_expr_lists[child_id])),
_runtime_filter_descs(tnode.runtime_filters) {
DCHECK_EQ(child_id, _cur_child_id);
DCHECK_GT(_child_quantity, 1);
}
#ifdef BE_TEST
SetSinkOperatorX(int _child_quantity)
: _cur_child_id(0),
_child_quantity(_child_quantity),
_is_colocate(false),
_partition_exprs() {}
#endif
~SetSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
DataSinkOperatorX<SetSinkLocalState<is_intersect>>::_name);
}
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
bool is_shuffled_operator() const override { return true; }
bool is_colocated_operator() const override { return _is_colocate; }
bool followed_by_shuffled_operator() const override {
return (is_shuffled_operator() && !is_colocated_operator()) ||
Base::_followed_by_shuffled_operator;
}
private:
template <class HashTableContext, bool is_intersected>
friend struct HashTableBuild;
Status _process_build_block(SetSinkLocalState<is_intersect>& local_state, Block& block,
RuntimeState* state);
Status _extract_build_column(SetSinkLocalState<is_intersect>& local_state, Block& block,
ColumnRawPtrs& raw_ptrs, size_t& rows);
const int _cur_child_id = 0;
const size_t _child_quantity;
// every child has its result expr list
VExprContextSPtrs _child_exprs;
const bool _is_colocate;
std::vector<TExpr> _partition_exprs;
using OperatorBase::_child;
const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
};
#include "common/compile_check_end.h"
} // namespace doris