blob: f9b162be62d3a3a973c07484265e6879b45b8e3d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <mutex>
#include <set>
#include <string>
#include "common/status.h"
#include "core/field.h"
#include "exec/common/util.hpp"
#include "exec/operator/operator.h"
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter_consumer_helper.h"
#include "exec/scan/scan_node.h"
#include "exec/scan/scanner_context.h"
#include "exprs/function_filter.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vin_predicate.h"
#include "runtime/descriptors.h"
#include "storage/predicate/filter_olap_param.h"
namespace doris {
#include "common/compile_check_begin.h"
class ScannerDelegate;
class OlapScanner;
} // namespace doris
namespace doris {
enum class PushDownType {
// The predicate can not be pushed down to data source
UNACCEPTABLE,
// The predicate can be pushed down to data source
// and the data source can fully evaludate it
ACCEPTABLE,
// The predicate can be pushed down to data source
// but the data source can not fully evaluate it.
PARTIAL_ACCEPTABLE
};
class ScanLocalStateBase : public PipelineXLocalState<> {
public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent), _helper(parent->runtime_filter_descs()) {}
~ScanLocalStateBase() override = default;
[[nodiscard]] virtual bool should_run_serial() const = 0;
virtual RuntimeProfile* scanner_profile() = 0;
[[nodiscard]] virtual const TupleDescriptor* input_tuple_desc() const = 0;
[[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0;
virtual int64_t limit_per_scanner() = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
// If scan operator is serial operator(like topn), its real parallelism is 1.
// Otherwise, its real parallelism is query_parallel_instance_num.
// query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
// for file scan operator, its real parallelism will be 1 if it is in batch mode.
// Related pr:
// https://github.com/apache/doris/pull/42460
// https://github.com/apache/doris/pull/44635
[[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const;
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
uint64_t get_condition_cache_digest() const { return _condition_cache_digest; }
Status update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num);
Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts);
protected:
friend class ScannerContext;
friend class Scanner;
virtual Status _init_profile() = 0;
std::atomic<bool> _opened {false};
DependencySPtr _scan_dependency = nullptr;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
// Num of newly created free blocks when running query
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
RuntimeProfile::Counter* _max_scan_concurrency = nullptr;
RuntimeProfile::Counter* _min_scan_concurrency = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
// time of filter output block from scanner
RuntimeProfile::Counter* _filter_timer = nullptr;
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter = nullptr;
RuntimeProfile::Counter* _num_scanners = nullptr;
RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
RuntimeProfile::Counter* _scan_rows = nullptr;
RuntimeProfile::Counter* _scan_bytes = nullptr;
std::mutex _conjuncts_lock;
RuntimeFilterConsumerHelper _helper;
// magic number as seed to generate hash value for condition cache
uint64_t _condition_cache_digest = 0;
// condition cache filter stats
RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr;
RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr;
// Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type.
std::atomic<bool> _eos = false;
int _max_pushdown_conditions_per_column = 1024;
// Save all function predicates which may be pushed down to data source.
std::vector<FunctionFilter> _push_down_functions;
// Virtual methods with default implementations; overridden by subclasses when supported.
// Declared here so that the normalize methods below (non-Derived-template) can call them.
virtual bool _push_down_topn(const RuntimePredicate& predicate) { return false; }
virtual PushDownType _should_push_down_bloom_filter() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_topn_filter() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_bitmap_filter() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_is_null_predicate(VectorizedFnCall* fn_call) const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_in_predicate() const {
return PushDownType::UNACCEPTABLE;
}
virtual PushDownType _should_push_down_binary_predicate(
VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val,
const std::set<std::string> fn_name) const {
return PushDownType::UNACCEPTABLE;
}
virtual Status _should_push_down_function_filter(VectorizedFnCall* fn_call,
VExprContext* expr_ctx,
StringRef* constant_str,
doris::FunctionContext** fn_ctx,
PushDownType& pdt) {
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
// Non-templated normalize methods, moved here to avoid re-compilation per Derived type.
Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt);
Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_topn_filter(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_bitmap_filter(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot,
PushDownType* pdt);
// Inner PrimitiveType-template methods. Moved to base to avoid N(Derived)×M(PrimitiveType)
// instantiation blowup: now instantiated M times total instead of N×M times.
template <PrimitiveType T>
Status _normalize_in_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_binary_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_is_null_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc>
Status _change_value_range(bool is_equal_op, ColumnValueRange<PrimitiveType>& range,
const Field& value, const ChangeFixedValueRangeFunc& func,
const std::string& fn_name);
};
template <typename LocalStateType>
class ScanOperatorX;
template <typename Derived>
class ScanLocalState : public ScanLocalStateBase {
ENABLE_FACTORY_CREATOR(ScanLocalState);
ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;
virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
virtual Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const final;
[[nodiscard]] bool should_run_serial() const override;
RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }
[[nodiscard]] const TupleDescriptor* input_tuple_desc() const override;
[[nodiscard]] const TupleDescriptor* output_tuple_desc() const override;
int64_t limit_per_scanner() override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override {}
TPushAggOp::type get_push_down_agg_type() override;
std::vector<Dependency*> execution_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res(_filter_dependencies.size());
std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(),
[](DependencySPtr dep) { return dep.get(); });
return res;
}
std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) {
std::vector<int> result;
for (int id : _parent->cast<typename Derived::Parent>()._topn_filter_source_node_ids) {
if (!state->get_query_ctx()->has_runtime_predicate(id)) {
// compatible with older versions fe
continue;
}
const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.enable()) {
continue;
}
if (_push_down_topn(pred) == push_down) {
result.push_back(id);
}
}
return result;
}
protected:
template <typename LocalStateType>
friend class ScanOperatorX;
friend class ScannerContext;
friend class Scanner;
Status _init_profile() override;
virtual Status _process_conjuncts(RuntimeState* state) { return _normalize_conjuncts(state); }
virtual bool _should_push_down_common_expr() { return false; }
virtual bool _storage_no_merge() { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
// Create a list of scanners.
// The number of scanners is related to the implementation of the data source,
// predicate conditions, and scheduling strategy.
// So this method needs to be implemented separately by the subclass of ScanNode.
// Finally, a set of scanners that have been prepared are returned.
virtual Status _init_scanners(std::list<ScannerSPtr>* scanners) { return Status::OK(); }
Status _normalize_conjuncts(RuntimeState* state);
// Normalize a conjunct and try to convert it to column predicate recursively.
Status _normalize_predicate(VExprContext* context, const VExprSPtr& root,
VExprSPtr& output_expr);
bool _is_predicate_acting_on_slot(const VExprSPtrs& children, SlotDescriptor** slot_desc,
ColumnValueRangeType** range);
Status _prepare_scanners();
// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners);
// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
// cast dst column type equals storage column type
void get_cast_types_for_variants();
void _filter_and_collect_cast_type_for_variant(
const VExpr* expr,
std::unordered_map<std::string, std::vector<DataTypePtr>>& colname_to_cast_types);
Status _get_topn_filters(RuntimeState* state);
// Stores conjuncts that have been fully pushed down to the storage layer as predicate columns.
// These expr contexts are kept alive to prevent their FunctionContext and constant strings
// from being freed prematurely.
VExprContextSPtrs _stale_expr_ctxs;
VExprContextSPtrs _common_expr_ctxs_push_down;
atomic_shared_ptr<ScannerContext> _scanner_ctx;
// colname -> cast dst type
std::map<std::string, DataTypePtr> _cast_types_for_variants;
// slot id -> ColumnValueRange
// Parsed from conjuncts
phmap::flat_hash_map<int, ColumnValueRangeType> _slot_id_to_value_range;
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates;
std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates;
std::vector<std::shared_ptr<Dependency>> _filter_dependencies;
// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<ScannerDelegate>> _scanners;
Arena _arena;
};
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, Block* block, bool* eos) override;
Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override {
Status status = get_block(state, block, eos);
if (status.ok()) {
if (auto rows = block->rows()) {
auto* local_state = state->get_local_state(operator_id());
COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
}
}
return status;
}
[[nodiscard]] bool is_source() const override { return true; }
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _runtime_filter_descs;
}
[[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
if (OperatorX<LocalStateType>::is_serial_operator()) {
// `is_serial_operator()` returns true means we ignore the distribution.
return {ExchangeType::NOOP};
}
return {ExchangeType::BUCKET_HASH_SHUFFLE};
}
void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);
if (auto ctx = local_state._scanner_ctx.load()) {
ctx->clear_free_blocks();
}
}
using OperatorX<LocalStateType>::node_id;
using OperatorX<LocalStateType>::operator_id;
using OperatorX<LocalStateType>::get_local_state;
#ifdef BE_TEST
ScanOperatorX() = default;
#endif
protected:
using LocalState = LocalStateType;
friend class OlapScanner;
ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int parallel_tasks = 0);
virtual ~ScanOperatorX() = default;
template <typename Derived>
friend class ScanLocalState;
friend class OlapScanLocalState;
// For load scan node, there should be both input and output tuple descriptor.
// For query scan node, there is only output_tuple_desc.
TupleId _input_tuple_id = -1;
TupleId _output_tuple_id = -1;
const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
phmap::flat_hash_map<int, SlotDescriptor*> _slot_id_to_slot_desc;
std::unordered_map<std::string, int> _colname_to_slot_id;
// These two values are from query_options
int _max_scan_key_num = 48;
int _max_pushdown_conditions_per_column = 1024;
// If the query like select * from table limit 10; then the query should run in
// single scanner to avoid too many scanners which will cause lots of useless read.
bool _should_run_serial = false;
VExprContextSPtrs _common_expr_ctxs_push_down;
// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;
// Shared remaining limit across all parallel instances and their scanners.
// Initialized to _limit (SQL LIMIT); -1 means no limit.
std::atomic<int64_t> _shared_scan_limit {-1};
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
TPushAggOp::type _push_down_agg_type;
// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;
std::vector<int> _topn_filter_source_node_ids;
};
#include "common/compile_check_end.h"
} // namespace doris