[feature](runtime-filter) Add runtime filter partition pruning
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Add runtime filter partition pruning support and cover planner/runtime handling for partition boundaries. Preserve partition-pruning monotonicity for grouped runtime filters with multiple scan targets, and run partition pruning for runtime filters acquired before scanner construction.
### Release note
Add runtime filter partition pruning support.
### Check List (For Author)
- Test: BE build, FE build, BE format, and FE checkstyle
- Manual test: ./build.sh --be
- Manual test: ./build.sh --fe
- Manual test: build-support/check-format.sh
- Manual test: cd fe && mvn checkstyle:check -pl fe-core
- Regression test: Added rf_partition_pruning coverage for grouped runtime filters; not run locally because the running cluster was not restarted with the patched build.
- Behavior changed: Yes (adds runtime filter partition pruning when enabled)
- Does this need documentation: No
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
diff --git a/be/src/exec/operator/olap_scan_operator.cpp b/be/src/exec/operator/olap_scan_operator.cpp
index 3cde3c9..ac05648 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -22,6 +22,7 @@
#include <memory>
#include <numeric>
#include <optional>
+#include <shared_mutex>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
@@ -33,10 +34,13 @@
#include "exec/scan/olap_scanner.h"
#include "exec/scan/parallel_scanner_builder.h"
#include "exprs/function/in.h"
+#include "exprs/hybrid_set.h"
#include "exprs/score_runtime.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
+#include "exprs/vliteral.h"
+#include "exprs/vruntimefilter_wrapper.h"
#include "exprs/vslot_ref.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/query_cache/query_cache.h"
@@ -91,6 +95,9 @@
RETURN_IF_ERROR(Base::init(state, info));
RETURN_IF_ERROR(_sync_cloud_tablets(state));
+
+ _attach_partition_boundaries();
+
return Status::OK();
}
@@ -121,6 +128,8 @@
// Rows read from storage.
// Include the rows read from doris page cache.
_scan_rows = ADD_COUNTER(custom_profile(), "ScanRows", TUnit::UNIT);
+ _tablets_pruned_by_rf_counter =
+ ADD_COUNTER(custom_profile(), "TabletsPrunedByRuntimeFilter", TUnit::UNIT);
// 1. init segment profile
_segment_profile.reset(new RuntimeProfile("SegmentIterator"));
@@ -499,6 +508,50 @@
_cond_ranges.emplace_back(new doris::OlapScanRange());
}
+ // Filter out tablets whose partitions have been pruned by runtime filters.
+ //
+ // TODO(rf-partition-prune): this happens after OlapScanLocalState::init()
+ // has already executed _sync_cloud_tablets() (in cloud mode that performs
+ // get_tablet() and waits for sync_rowsets() on every scan range) and after
+ // capture_read_source() has been called for every tablet. RFs that are
+ // ready at start therefore still pay the full per-tablet metadata / read
+ // source setup cost for partitions that are immediately dropped here, so
+ // the current feature only saves scanner construction and scan IO, not
+ // the expensive setup work that partition pruning was originally intended
+ // to avoid. To fix this we need to (a) add partition_id onto
+ // TPaloScanRange so BE knows the partition without first materializing
+ // the tablet, (b) acquire ready-at-start RFs before _sync_cloud_tablets()
+ // and run partition pruning there to filter _scan_ranges by partition_id
+ // so the heavy per-tablet work is skipped for pruned partitions.
+ if (_rf_partition_pruner.pruned_partition_count() > 0) {
+ DCHECK_EQ(_tablets.size(), _scan_ranges.size());
+ DCHECK_EQ(_tablets.size(), _read_sources.size());
+ size_t write_idx = 0;
+ for (size_t read_idx = 0; read_idx < _tablets.size(); ++read_idx) {
+ int64_t pid = _tablets[read_idx].tablet->partition_id();
+ if (!_rf_partition_pruner.is_partition_pruned(pid)) {
+ if (write_idx != read_idx) {
+ _tablets[write_idx] = std::move(_tablets[read_idx]);
+ _scan_ranges[write_idx] = std::move(_scan_ranges[read_idx]);
+ _read_sources[write_idx] = std::move(_read_sources[read_idx]);
+ }
+ ++write_idx;
+ }
+ }
+ if (write_idx < _tablets.size()) {
+ COUNTER_SET(_tablets_pruned_by_rf_counter,
+ static_cast<int64_t>(_tablets.size() - write_idx));
+ _tablets.resize(write_idx);
+ _scan_ranges.resize(write_idx);
+ _read_sources.resize(write_idx);
+ }
+ if (_tablets.empty()) {
+ _eos = true;
+ _scan_dependency->set_ready();
+ return Status::OK();
+ }
+ }
+
bool enable_parallel_scan = state()->enable_parallel_scan();
// The flag of preagg's meaning is whether return pre agg data(or partial agg data)
@@ -1090,4 +1143,31 @@
}
}
+// ======== Runtime Filter Partition Pruning ========
+
+Status OlapScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(ScanOperatorX<OlapScanLocalState>::prepare(state));
+ // Parse partition boundaries once per fragment-on-host. Cost (VLiteral
+ // construction + ColumnPtr materialization + ColumnValueRange build per
+ // boundary literal) is plan-time-static, so doing it here -- rather than
+ // in per-instance LocalState::init -- avoids paying it parallel_tasks
+ // times. The parsed result lives on the generic ScanOperatorX base and is
+ // read by every per-instance pruner via OperatorXBase::parsed_partition_boundaries().
+ if (state->query_options().enable_runtime_filter_partition_prune &&
+ _olap_scan_node.__isset.partition_boundaries &&
+ !_olap_scan_node.partition_boundaries.empty()) {
+ _parsed_partition_boundaries.parse(_olap_scan_node.partition_boundaries,
+ _slot_id_to_slot_desc);
+ }
+ return Status::OK();
+}
+
+void OlapScanLocalState::_attach_partition_boundaries() {
+ const auto* parsed = _parent->parsed_partition_boundaries();
+ if (parsed == nullptr || parsed->empty()) {
+ return;
+ }
+ COUNTER_SET(_total_partitions_rf_counter, parsed->total_partitions());
+}
+
} // namespace doris
diff --git a/be/src/exec/operator/olap_scan_operator.h b/be/src/exec/operator/olap_scan_operator.h
index e86b3c6..e6f42eb 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -19,13 +19,16 @@
#include <stdint.h>
+#include <shared_mutex>
#include <string>
+#include <unordered_set>
#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "exec/operator/operator.h"
#include "exec/operator/scan_operator.h"
#include "runtime/runtime_profile.h"
+#include "storage/olap_scan_common.h"
#include "storage/tablet/tablet_reader.h"
namespace doris {
@@ -320,6 +323,14 @@
std::map<SlotId, size_t> _slot_id_to_index_in_block;
// this map is needed for scanner opening.
std::map<SlotId, DataTypePtr> _slot_id_to_col_type;
+
+ // ---- Runtime-filter partition pruning ----
+ // Attaches this per-instance pruner to the shared parse result owned by
+ // OlapScanOperatorX (parsed once in OperatorX::prepare()). Cheap: pointer
+ // assignment plus a counter set, no parsing work.
+ void _attach_partition_boundaries();
+
+ RuntimeProfile::Counter* _tablets_pruned_by_rf_counter = nullptr;
};
class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
@@ -328,6 +339,8 @@
const DescriptorTbl& descs, int parallel_tasks,
const TQueryCacheParam& cache_param);
+ Status prepare(RuntimeState* state) override;
+
int get_column_id(const std::string& col_name) const override {
if (!_tablet_schema) {
return -1;
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index 0d500ca..3dc4b89 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -53,6 +53,7 @@
class AsyncResultWriter;
class ScoreRuntime;
class AnnTopNRuntime;
+class ParsedPartitionBoundaries;
} // namespace doris
namespace doris {
@@ -829,6 +830,15 @@
[[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
+
+ // Per-fragment shared partition-boundary parse result, used for
+ // runtime-filter partition pruning. Returns nullptr for operators that
+ // don't support this feature (default). Scan operators override to expose
+ // their parsed boundaries; the per-instance pruning state lives on the
+ // ScanLocalState. This sits on the generic OperatorXBase so non-templated
+ // ScanLocalStateBase methods can fetch it without down-casting `_parent`
+ // to a specific scan type.
+ virtual const ParsedPartitionBoundaries* parsed_partition_boundaries() const { return nullptr; }
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; }
bool is_blockable(RuntimeState* state) const override {
diff --git a/be/src/exec/operator/scan_operator.cpp b/be/src/exec/operator/scan_operator.cpp
index 5a731ae..a2b5178 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -74,6 +74,7 @@
int& arrived_rf_num) {
// Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads
std::unique_lock lock(_conjuncts_lock);
+ size_t conjuncts_before = _conjuncts.size();
RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(),
arrived_rf_num, _conjuncts));
if (state->enable_adjust_conjunct_order_by_cost()) {
@@ -81,6 +82,14 @@
return a->execute_cost() < b->execute_cost();
});
};
+ // Only re-run partition pruning when try_append_late_arrival_runtime_filter
+ // actually appended new conjuncts. Otherwise this hook would re-scan all
+ // partition boundaries on every scheduler pass while there are still
+ // unapplied RFs (Scanner::_applied_rf_num is not advanced here), wasting
+ // CPU re-evaluating the same set of RFs against the same boundaries.
+ if (_conjuncts.size() > conjuncts_before) {
+ _on_runtime_filter_update();
+ }
return Status::OK();
}
@@ -94,6 +103,33 @@
return Status::OK();
}
+bool ScanLocalStateBase::is_partition_pruned(int64_t partition_id) const {
+ return _rf_partition_pruner.is_partition_pruned(partition_id);
+}
+
+void ScanLocalStateBase::_on_runtime_filter_update() {
+ const auto* parsed = _parent->parsed_partition_boundaries();
+ if (parsed != nullptr && !parsed->empty()) {
+ _do_partition_pruning_by_rf();
+ }
+}
+
+void ScanLocalStateBase::_do_partition_pruning_by_rf() {
+ if (!_state->query_options().enable_runtime_filter_partition_prune) {
+ return;
+ }
+ const auto* parsed = _parent->parsed_partition_boundaries();
+ if (parsed == nullptr || parsed->empty()) {
+ return;
+ }
+ int64_t newly_pruned = _rf_partition_pruner.prune_by_runtime_filters(
+ *parsed, _conjuncts, _parent->runtime_filter_descs(), _parent->node_id());
+ if (newly_pruned > 0) {
+ COUNTER_SET(_partitions_pruned_by_rf_counter,
+ _rf_partition_pruner.pruned_partition_count());
+ }
+}
+
int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
@@ -190,7 +226,11 @@
RETURN_IF_ERROR(
p._common_expr_ctxs_push_down[i]->clone(state, _common_expr_ctxs_push_down[i]));
}
+ size_t conjuncts_before = _conjuncts.size();
RETURN_IF_ERROR(_helper.acquire_runtime_filter(state, _conjuncts, p.row_descriptor()));
+ if (_conjuncts.size() > conjuncts_before) {
+ _on_runtime_filter_update();
+ }
// Disable condition cache in topn filter valid. TODO:: Try to support the topn filter in condition cache
if (state->query_options().condition_cache_digest && p._topn_filter_source_node_ids.empty()) {
@@ -1073,6 +1113,11 @@
_condition_cache_filtered_rows_counter =
ADD_COUNTER(_scanner_profile, "ConditionCacheFilteredRows", TUnit::UNIT);
+ _partitions_pruned_by_rf_counter =
+ ADD_COUNTER(custom_profile(), "PartitionsPrunedByRuntimeFilter", TUnit::UNIT);
+ _total_partitions_rf_counter =
+ ADD_COUNTER(custom_profile(), "TotalPartitionsForRFPruning", TUnit::UNIT);
+
// Rows read from storage.
// Include the rows read from doris page cache.
_scan_rows = ADD_COUNTER_WITH_LEVEL(custom_profile(), profile::SCAN_ROWS, TUnit::UNIT, 1);
diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h
index 2551f76..39ba630 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -28,6 +28,7 @@
#include "exec/operator/operator.h"
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter_consumer_helper.h"
+#include "exec/runtime_filter/runtime_filter_partition_pruner.h"
#include "exec/scan/scan_node.h"
#include "exec/scan/scanner_context.h"
#include "exprs/function_filter.h"
@@ -84,6 +85,10 @@
[[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
[[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const;
+ // Thread-safe check whether a partition has been pruned by runtime filter.
+ // Callable from any scan type's scanner in scheduling threads.
+ bool is_partition_pruned(int64_t partition_id) const;
+
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
uint64_t get_condition_cache_digest() const { return _condition_cache_digest; }
@@ -98,6 +103,13 @@
virtual Status _init_profile() = 0;
+ // Hook for subclasses to react after new runtime filters are appended.
+ // Called inside update_late_arrival_runtime_filter() while _conjuncts_lock is held.
+ // Default implementation runs partition pruning on the newly appended RFs.
+ virtual void _on_runtime_filter_update();
+
+ void _do_partition_pruning_by_rf();
+
std::atomic<bool> _opened {false};
DependencySPtr _scan_dependency = nullptr;
@@ -133,6 +145,11 @@
RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr;
RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr;
+ // ---- Runtime-filter partition pruning (scan-agnostic) ----
+ RuntimeFilterPartitionPruner _rf_partition_pruner;
+ RuntimeProfile::Counter* _partitions_pruned_by_rf_counter = nullptr;
+ RuntimeProfile::Counter* _total_partitions_rf_counter = nullptr;
+
// Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type.
std::atomic<bool> _eos = false;
int _max_pushdown_conditions_per_column = 1024;
@@ -279,7 +296,10 @@
friend class Scanner;
Status _init_profile() override;
- virtual Status _process_conjuncts(RuntimeState* state) { return _normalize_conjuncts(state); }
+ virtual Status _process_conjuncts(RuntimeState* state) {
+ _do_partition_pruning_by_rf();
+ return _normalize_conjuncts(state);
+ }
virtual bool _should_push_down_common_expr() { return false; }
virtual bool _storage_no_merge() { return false; }
@@ -363,6 +383,16 @@
return _runtime_filter_descs;
}
+ // Expose this operator's per-fragment shared partition-boundary parse
+ // result to the non-templated ScanLocalStateBase so it can drive runtime
+ // filter partition pruning without down-casting to a specific scan type.
+ // Subclasses are expected to populate `_parsed_partition_boundaries` from
+ // their own partition-boundary thrift field inside their `prepare()`
+ // override before any LocalState observes the result.
+ const ParsedPartitionBoundaries* parsed_partition_boundaries() const override {
+ return &_parsed_partition_boundaries;
+ }
+
[[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
@@ -440,6 +470,12 @@
std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
std::shared_ptr<MemLimiter> _mem_limiter = nullptr;
+
+ // Shared parse result of partition boundaries for runtime-filter partition
+ // pruning. Lives here (rather than on the Olap-specific subclass) so any
+ // future scan type can populate it in its `prepare()` override and reuse
+ // the generic pruning machinery in ScanLocalStateBase.
+ ParsedPartitionBoundaries _parsed_partition_boundaries;
};
} // namespace doris
diff --git a/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
new file mode 100644
index 0000000..527a2a5e
--- /dev/null
+++ b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
@@ -0,0 +1,749 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/runtime_filter/runtime_filter_partition_pruner.h"
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <optional>
+#include <utility>
+
+#include "core/block/block.h"
+#include "core/column/column.h"
+#include "core/column/column_nullable.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/field.h"
+#include "exprs/bloom_filter_func.h"
+#include "exprs/hybrid_set.h"
+#include "exprs/vexpr.h"
+#include "exprs/vexpr_context.h"
+#include "exprs/vliteral.h"
+#include "exprs/vruntimefilter_wrapper.h"
+#include "exprs/vslot_ref.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+// Complexity is inflated by macro expansion for each PrimitiveType case.
+void ParsedPartitionBoundaries::parse(
+ const std::vector<TPartitionBoundary>& boundaries,
+ const phmap::flat_hash_map<int, SlotDescriptor*>& slot_descs) {
+ for (const auto& tb : boundaries) {
+ if (!tb.__isset.partition_id || !tb.__isset.slot_id) {
+ continue;
+ }
+ SlotId slot_id = tb.slot_id;
+
+ auto slot_it = slot_descs.find(slot_id);
+ if (slot_it == slot_descs.end()) {
+ continue;
+ }
+ SlotDescriptor* slot = slot_it->second;
+ // Reuse the slot's pre-built DataType: walking through VLiteral here
+ // would cost a `DataTypeFactory::create_data_type(node)` heap allocation
+ // and a one-row `ColumnConst` allocation per boundary endpoint. With
+ // thousands of partitions that dominates BuildTasksTime.
+ const DataTypePtr& slot_type = slot->type();
+ PrimitiveType ptype = slot_type->get_primitive_type();
+ int precision = cast_set<int>(slot_type->get_precision());
+ int scale = cast_set<int>(slot_type->get_scale());
+ bool is_nullable = slot->is_nullable();
+
+ // Store slot data type for potential projection use
+ _slot_data_types[slot_id] = slot_type;
+
+ ParsedBoundary boundary;
+ boundary.partition_id = tb.partition_id;
+ boundary.slot_id = slot_id;
+ boundary.is_nullable = is_nullable;
+
+ bool parsed_ok = false;
+
+#define BUILD_BOUNDARY_CVR(NAME) \
+ case TYPE_##NAME: { \
+ using CppType = typename PrimitiveTypeTraits<TYPE_##NAME>::CppType; \
+ bool is_list = tb.__isset.list_values && !tb.list_values.empty(); \
+ bool is_range = tb.__isset.range_start || tb.__isset.range_end; \
+ if (!is_list && !is_range) break; \
+ ColumnValueRange<TYPE_##NAME> cvr(slot->col_name(), is_nullable, precision, scale); \
+ /* Returns nullopt if `node` is a NULL literal; the caller then sets contain_null */ \
+ /* on the CVR instead of trying to extract a typed value (which would dereference */ \
+ /* a null data pointer for the non-string branch). */ \
+ auto parse_texpr_node = [&](const TExprNode& node) -> std::optional<CppType> { \
+ if (node.node_type == TExprNodeType::NULL_LITERAL) { \
+ return std::nullopt; \
+ } \
+ /* `Field` value is copied into the CVR by `add_fixed_value` / */ \
+ /* `add_range` (both take CppType by const-ref / by value), so the */ \
+ /* temporary `Field`'s lifetime ending at this expression's full-statement */ \
+ /* boundary is safe -- including for `String` payloads. */ \
+ Field field = slot_type->get_field(node); \
+ return std::make_optional<CppType>(field.get<TYPE_##NAME>()); \
+ }; \
+ if (is_list) { \
+ auto empty_cvr = ColumnValueRange<TYPE_##NAME>::create_empty_column_value_range( \
+ is_nullable, precision, scale); \
+ bool list_has_null = false; \
+ bool list_has_value = false; \
+ for (const auto& node : tb.list_values) { \
+ auto parsed = parse_texpr_node(node); \
+ if (!parsed) { \
+ list_has_null = true; \
+ continue; \
+ } \
+ static_cast<void>(empty_cvr.add_fixed_value(*parsed)); \
+ list_has_value = true; \
+ } \
+ if (list_has_value) { \
+ cvr.intersection(empty_cvr); \
+ } \
+ if (list_has_null && is_nullable) { \
+ /* Track NULL membership on ParsedBoundary; calling */ \
+ /* cvr.set_contain_null(true) here would invoke */ \
+ /* set_empty_value_range() and discard the concrete fixed */ \
+ /* values we just inserted, turning {NULL, v} into a */ \
+ /* NULL-only boundary. */ \
+ boundary.contains_null = true; \
+ if (!list_has_value) { \
+ boundary.only_null = true; \
+ } \
+ } \
+ } else { \
+ if (tb.__isset.range_start) { \
+ auto parsed = parse_texpr_node(tb.range_start); \
+ if (parsed) { \
+ static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, *parsed)); \
+ } \
+ } \
+ if (tb.__isset.range_end) { \
+ auto parsed = parse_texpr_node(tb.range_end); \
+ if (parsed) { \
+ /* Multi-column RANGE projection emits a CLOSED upper bound (see */ \
+ /* TPartitionBoundary.range_end_inclusive comment); single-column RANGE */ \
+ /* keeps the natural OPEN upper bound matching Doris semantics. */ \
+ SQLFilterOp upper_op = \
+ (tb.__isset.range_end_inclusive && tb.range_end_inclusive) \
+ ? FILTER_LESS_OR_EQUAL \
+ : FILTER_LESS; \
+ static_cast<void>(cvr.add_range(upper_op, *parsed)); \
+ } \
+ } \
+ } \
+ boundary.boundary_cvr = std::move(cvr); \
+ parsed_ok = true; \
+ break; \
+ }
+
+ switch (ptype) {
+ BUILD_BOUNDARY_CVR(TINYINT)
+ BUILD_BOUNDARY_CVR(SMALLINT)
+ BUILD_BOUNDARY_CVR(INT)
+ BUILD_BOUNDARY_CVR(BIGINT)
+ BUILD_BOUNDARY_CVR(LARGEINT)
+ BUILD_BOUNDARY_CVR(FLOAT)
+ BUILD_BOUNDARY_CVR(DOUBLE)
+ BUILD_BOUNDARY_CVR(CHAR)
+ BUILD_BOUNDARY_CVR(DATE)
+ BUILD_BOUNDARY_CVR(DATETIME)
+ BUILD_BOUNDARY_CVR(DATEV2)
+ BUILD_BOUNDARY_CVR(DATETIMEV2)
+ BUILD_BOUNDARY_CVR(TIMESTAMPTZ)
+ BUILD_BOUNDARY_CVR(VARCHAR)
+ BUILD_BOUNDARY_CVR(STRING)
+ BUILD_BOUNDARY_CVR(DECIMAL32)
+ BUILD_BOUNDARY_CVR(DECIMAL64)
+ BUILD_BOUNDARY_CVR(DECIMAL128I)
+ BUILD_BOUNDARY_CVR(DECIMAL256)
+ BUILD_BOUNDARY_CVR(DECIMALV2)
+ BUILD_BOUNDARY_CVR(BOOLEAN)
+ BUILD_BOUNDARY_CVR(IPV4)
+ BUILD_BOUNDARY_CVR(IPV6)
+ default:
+ break;
+ }
+#undef BUILD_BOUNDARY_CVR
+
+ if (parsed_ok) {
+ _partition_column_slot_ids.insert(slot_id);
+ _slot_to_boundaries[slot_id].push_back(std::move(boundary));
+ }
+ }
+
+ // Count distinct partition IDs across all boundaries.
+ if (!_partition_column_slot_ids.empty()) {
+ phmap::flat_hash_set<int64_t> all_partition_ids;
+ for (const auto& [_, slot_boundaries] : _slot_to_boundaries) {
+ for (const auto& pb : slot_boundaries) {
+ all_partition_ids.insert(pb.partition_id);
+ }
+ }
+ _total_partition_count = static_cast<int64_t>(all_partition_ids.size());
+ }
+}
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
+static const VSlotRef* find_unique_slot_ref(const VExpr* expr) {
+ if (!expr) {
+ return nullptr;
+ }
+ if (expr->is_slot_ref()) {
+ return assert_cast<const VSlotRef*>(expr);
+ }
+ const VSlotRef* found = nullptr;
+ for (const auto& child : expr->children()) {
+ const VSlotRef* c = find_unique_slot_ref(child.get());
+ if (c) {
+ if (found) {
+ return nullptr; // multiple slot refs, can't handle
+ }
+ found = c;
+ }
+ }
+ return found;
+}
+
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+const std::vector<ParsedBoundary>* ParsedPartitionBoundaries::get_or_compute_projection(
+ int filter_id, const VExprSPtr& target_expr, SlotId leaf_slot_id, int leaf_column_id,
+ TTargetExprMonotonicity::type direction, VExprContext* ctx) const {
+ {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ auto it = _projection_cache.find(filter_id);
+ if (it != _projection_cache.end()) {
+ return &it->second;
+ }
+ }
+
+ // Build projection
+ std::vector<ParsedBoundary> projected;
+
+ auto slot_boundaries_it = _slot_to_boundaries.find(leaf_slot_id);
+ if (slot_boundaries_it == _slot_to_boundaries.end()) {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+ }
+
+ const auto& orig_boundaries = slot_boundaries_it->second;
+ if (orig_boundaries.empty()) {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+ }
+
+ auto slot_type_it = _slot_data_types.find(leaf_slot_id);
+ if (slot_type_it == _slot_data_types.end()) {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+ }
+
+ // LIST partitions: TODO(rf-partition-prune) -- projecting through a
+ // monotonic function preserves set membership but requires re-grouping
+ // the projected values back per-partition. Skip for now (caller treats
+ // empty cache as "no projection available").
+ {
+ bool any_list = false;
+ std::visit([&](const auto& cvr) { any_list = cvr.is_fixed_value_range(); },
+ orig_boundaries[0].boundary_cvr);
+ if (any_list) {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+ }
+ }
+
+ const DataTypePtr& input_type = slot_type_it->second;
+ bool input_is_nullable = input_type->is_nullable();
+ DataTypePtr inner_type = input_is_nullable ? remove_nullable(input_type) : input_type;
+ PrimitiveType input_ptype = input_type->get_primitive_type();
+
+ size_t N = orig_boundaries.size();
+
+ // Track open bounds (low_value == TYPE_MIN or high_value == TYPE_MAX)
+ // externally so we can propagate null_in_projection regardless of whether
+ // the input column is nullable.
+ std::vector<bool> ext_null_lo(N, false);
+ std::vector<bool> ext_null_hi(N, false);
+
+ // Build input blocks for lo and hi
+ Block lo_block;
+ Block hi_block;
+
+ // Pad columns 0..leaf_column_id-1 with placeholder columns so the leaf
+ // VSlotRef (whose column_id is leaf_column_id) reads our typed column.
+ for (int col_idx = 0; col_idx < leaf_column_id; ++col_idx) {
+ auto add_dummy = [&](Block& blk) {
+ auto col = ColumnUInt8::create(N, 0);
+ blk.insert({std::move(col), std::make_shared<DataTypeUInt8>(),
+ fmt::format("dummy_{}", col_idx)});
+ };
+ add_dummy(lo_block);
+ add_dummy(hi_block);
+ }
+
+ // Macro-dispatch on input PrimitiveType to build the typed value column
+ // for each side. Inserts NULL (when input column is nullable) or default
+ // (when not) for boundaries that are only_null / open / type-mismatched;
+ // ext_null_lo/hi tracks these so we can mark them null_in_projection
+ // after executing the projection.
+ bool input_built = false;
+#define BUILD_INPUT_COLUMNS(INPUT_PT) \
+ case TYPE_##INPUT_PT: { \
+ using InCol = typename PrimitiveTypeTraits<TYPE_##INPUT_PT>::ColumnType; \
+ MutableColumnPtr lo_inner_base = inner_type->create_column(); \
+ MutableColumnPtr hi_inner_base = inner_type->create_column(); \
+ auto* lo_inner = assert_cast<InCol*>(lo_inner_base.get()); \
+ auto* hi_inner = assert_cast<InCol*>(hi_inner_base.get()); \
+ lo_inner->reserve(N); \
+ hi_inner->reserve(N); \
+ auto lo_nulls = ColumnUInt8::create(); \
+ auto hi_nulls = ColumnUInt8::create(); \
+ lo_nulls->reserve(N); \
+ hi_nulls->reserve(N); \
+ for (size_t i = 0; i < N; ++i) { \
+ const auto& boundary = orig_boundaries[i]; \
+ bool null_lo = boundary.only_null; \
+ bool null_hi = boundary.only_null; \
+ const auto* cvr = \
+ std::get_if<ColumnValueRange<TYPE_##INPUT_PT>>(&boundary.boundary_cvr); \
+ if (cvr == nullptr) { \
+ null_lo = null_hi = true; \
+ } else { \
+ if (cvr->is_low_value_minimum()) null_lo = true; \
+ if (cvr->is_high_value_maximum()) null_hi = true; \
+ } \
+ ext_null_lo[i] = null_lo; \
+ ext_null_hi[i] = null_hi; \
+ if (null_lo) { \
+ lo_inner->insert_default(); \
+ lo_nulls->get_data().push_back(1); \
+ } else { \
+ lo_inner->insert_value(cvr->get_range_min_value()); \
+ lo_nulls->get_data().push_back(0); \
+ } \
+ if (null_hi) { \
+ hi_inner->insert_default(); \
+ hi_nulls->get_data().push_back(1); \
+ } else { \
+ hi_inner->insert_value(cvr->get_range_max_value()); \
+ hi_nulls->get_data().push_back(0); \
+ } \
+ } \
+ if (input_is_nullable) { \
+ auto lo_col = ColumnNullable::create(std::move(lo_inner_base), std::move(lo_nulls)); \
+ auto hi_col = ColumnNullable::create(std::move(hi_inner_base), std::move(hi_nulls)); \
+ lo_block.insert({std::move(lo_col), input_type, "leaf_slot"}); \
+ hi_block.insert({std::move(hi_col), input_type, "leaf_slot"}); \
+ } else { \
+ lo_block.insert({std::move(lo_inner_base), input_type, "leaf_slot"}); \
+ hi_block.insert({std::move(hi_inner_base), input_type, "leaf_slot"}); \
+ } \
+ input_built = true; \
+ break; \
+ }
+
+ switch (input_ptype) {
+ BUILD_INPUT_COLUMNS(TINYINT)
+ BUILD_INPUT_COLUMNS(SMALLINT)
+ BUILD_INPUT_COLUMNS(INT)
+ BUILD_INPUT_COLUMNS(BIGINT)
+ BUILD_INPUT_COLUMNS(LARGEINT)
+ BUILD_INPUT_COLUMNS(FLOAT)
+ BUILD_INPUT_COLUMNS(DOUBLE)
+ BUILD_INPUT_COLUMNS(CHAR)
+ BUILD_INPUT_COLUMNS(DATE)
+ BUILD_INPUT_COLUMNS(DATETIME)
+ BUILD_INPUT_COLUMNS(DATEV2)
+ BUILD_INPUT_COLUMNS(DATETIMEV2)
+ BUILD_INPUT_COLUMNS(TIMESTAMPTZ)
+ BUILD_INPUT_COLUMNS(VARCHAR)
+ BUILD_INPUT_COLUMNS(STRING)
+ BUILD_INPUT_COLUMNS(DECIMAL32)
+ BUILD_INPUT_COLUMNS(DECIMAL64)
+ BUILD_INPUT_COLUMNS(DECIMAL128I)
+ BUILD_INPUT_COLUMNS(DECIMAL256)
+ BUILD_INPUT_COLUMNS(DECIMALV2)
+ BUILD_INPUT_COLUMNS(BOOLEAN)
+ BUILD_INPUT_COLUMNS(IPV4)
+ BUILD_INPUT_COLUMNS(IPV6)
+ default:
+ break;
+ }
+#undef BUILD_INPUT_COLUMNS
+
+ if (!input_built) {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+ }
+
+ int lo_result_id = -1;
+ int hi_result_id = -1;
+ auto status_lo = target_expr->execute(ctx, &lo_block, &lo_result_id);
+ auto status_hi = target_expr->execute(ctx, &hi_block, &hi_result_id);
+ if (!status_lo.ok() || !status_hi.ok() || lo_result_id < 0 || hi_result_id < 0) {
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+ }
+
+ const auto& lo_result_col = lo_block.get_by_position(lo_result_id).column;
+ const auto& hi_result_col = hi_block.get_by_position(hi_result_id).column;
+
+ int out_precision = cast_set<int>(target_expr->data_type()->get_precision());
+ int out_scale = cast_set<int>(target_expr->data_type()->get_scale());
+ bool out_nullable = target_expr->data_type()->is_nullable();
+ PrimitiveType output_ptype = target_expr->data_type()->get_primitive_type();
+
+ projected.resize(N);
+
+#define BUILD_PROJECTED_CVR(OUTPUT_PT) \
+ case TYPE_##OUTPUT_PT: { \
+ using OutputCppType = typename PrimitiveTypeTraits<TYPE_##OUTPUT_PT>::CppType; \
+ for (size_t i = 0; i < N; ++i) { \
+ projected[i].partition_id = orig_boundaries[i].partition_id; \
+ projected[i].slot_id = leaf_slot_id; \
+ projected[i].is_nullable = out_nullable; \
+ projected[i].only_null = orig_boundaries[i].only_null; \
+ projected[i].contains_null = orig_boundaries[i].contains_null; \
+ bool lo_is_null = ext_null_lo[i] || lo_result_col->is_null_at(i); \
+ bool hi_is_null = ext_null_hi[i] || hi_result_col->is_null_at(i); \
+ if (lo_is_null || hi_is_null) { \
+ projected[i].null_in_projection = true; \
+ ColumnValueRange<TYPE_##OUTPUT_PT> full_range("", out_nullable, out_precision, \
+ out_scale); \
+ projected[i].boundary_cvr = std::move(full_range); \
+ } else { \
+ Field lo_field = (*lo_result_col)[i]; \
+ Field hi_field = (*hi_result_col)[i]; \
+ OutputCppType lo_projected = lo_field.get<TYPE_##OUTPUT_PT>(); \
+ OutputCppType hi_projected = hi_field.get<TYPE_##OUTPUT_PT>(); \
+ ColumnValueRange<TYPE_##OUTPUT_PT> cvr("", out_nullable, out_precision, \
+ out_scale); \
+ if (direction == TTargetExprMonotonicity::MONOTONIC_DECREASING) { \
+ static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, hi_projected)); \
+ static_cast<void>(cvr.add_range(FILTER_LESS_OR_EQUAL, lo_projected)); \
+ } else { \
+ static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, lo_projected)); \
+ static_cast<void>(cvr.add_range(FILTER_LESS_OR_EQUAL, hi_projected)); \
+ } \
+ projected[i].boundary_cvr = std::move(cvr); \
+ } \
+ } \
+ break; \
+ }
+
+ switch (output_ptype) {
+ BUILD_PROJECTED_CVR(TINYINT)
+ BUILD_PROJECTED_CVR(SMALLINT)
+ BUILD_PROJECTED_CVR(INT)
+ BUILD_PROJECTED_CVR(BIGINT)
+ BUILD_PROJECTED_CVR(LARGEINT)
+ BUILD_PROJECTED_CVR(FLOAT)
+ BUILD_PROJECTED_CVR(DOUBLE)
+ BUILD_PROJECTED_CVR(CHAR)
+ BUILD_PROJECTED_CVR(DATE)
+ BUILD_PROJECTED_CVR(DATETIME)
+ BUILD_PROJECTED_CVR(DATEV2)
+ BUILD_PROJECTED_CVR(DATETIMEV2)
+ BUILD_PROJECTED_CVR(TIMESTAMPTZ)
+ BUILD_PROJECTED_CVR(VARCHAR)
+ BUILD_PROJECTED_CVR(STRING)
+ BUILD_PROJECTED_CVR(DECIMAL32)
+ BUILD_PROJECTED_CVR(DECIMAL64)
+ BUILD_PROJECTED_CVR(DECIMAL128I)
+ BUILD_PROJECTED_CVR(DECIMAL256)
+ BUILD_PROJECTED_CVR(DECIMALV2)
+ BUILD_PROJECTED_CVR(BOOLEAN)
+ BUILD_PROJECTED_CVR(IPV4)
+ BUILD_PROJECTED_CVR(IPV6)
+ default:
+ break;
+ }
+
+#undef BUILD_PROJECTED_CVR
+
+ std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+ _projection_cache[filter_id] = std::move(projected);
+ return &_projection_cache[filter_id];
+}
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
+static SQLFilterOp convert_opcode_to_filter_op(TExprOpcode::type op) {
+ switch (op) {
+ case TExprOpcode::LE:
+ return FILTER_LESS_OR_EQUAL;
+ case TExprOpcode::LT:
+ return FILTER_LESS;
+ case TExprOpcode::GE:
+ return FILTER_LARGER_OR_EQUAL;
+ case TExprOpcode::GT:
+ return FILTER_LARGER;
+ default:
+ return FILTER_IN; // sentinel: caller should skip
+ }
+}
+
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+void RuntimeFilterPartitionPruner::_try_prune_by_single_rf(
+ const std::vector<ParsedBoundary>& boundaries, const VExprSPtr& impl,
+ phmap::flat_hash_set<int64_t>& newly_pruned) {
+ // Pre-compute whether the RF "matches NULL" -- i.e. whether the RF would
+ // accept a row whose probe value is NULL. The signal differs by RF impl:
+ // * IN filter → HybridSet::contain_null()
+ // * Bloom filter → BloomFilterFuncBase::contain_null()
+ // * MinMax filter → encoded via the BinaryPredicate node_type:
+ // NULL_AWARE_BINARY_PRED means the build side had a
+ // NULL key and a null-safe equal join asked NULL to
+ // match NULL; plain BINARY_PRED never matches NULL.
+ // FilterBase::contain_null() already folds in `_null_aware`, so we only
+ // get a true result when the build side is actually null-aware AND
+ // produced a NULL value.
+ bool rf_contains_null = false;
+ if (auto hybrid_set = impl->get_set_func()) {
+ rf_contains_null = hybrid_set->contain_null();
+ } else if (impl->node_type() == TExprNodeType::BLOOM_PRED) {
+ auto bloom = impl->get_bloom_filter_func();
+ rf_contains_null = bloom && bloom->contain_null();
+ } else if (impl->node_type() == TExprNodeType::NULL_AWARE_BINARY_PRED) {
+ // Min/Max RF built on a null-safe equal join. The literal child holds
+ // the min or max bound; the NULL semantic is conveyed by the node
+ // type itself (see create_vbin_predicate in runtime_filter/utils.cpp).
+ rf_contains_null = true;
+ }
+
+ for (const auto& pb : boundaries) {
+ if (_pruned_partition_ids.contains(pb.partition_id) ||
+ newly_pruned.contains(pb.partition_id)) {
+ continue;
+ }
+
+ // Skip partitions with NULL projection conservatively
+ if (pb.null_in_projection) {
+ continue;
+ }
+
+ // NULL handling:
+ // A partition row whose key is NULL matches the RF iff `rf_contains_null`.
+ // - only_null partition (rows are exclusively NULL): prunable iff !rf_contains_null.
+ // - mixed (NULL + concrete values): if rf_contains_null, NULL rows alone
+ // prevent pruning. Otherwise NULL rows can never match, so we ignore
+ // the NULL portion and let the regular non-NULL intersection decide.
+ if (pb.only_null) {
+ if (!rf_contains_null) {
+ newly_pruned.insert(pb.partition_id);
+ }
+ continue;
+ }
+ if (pb.contains_null && rf_contains_null) {
+ continue;
+ }
+
+ std::visit(
+ [&](const auto& boundary_cvr) {
+ using CvrType = std::decay_t<decltype(boundary_cvr)>;
+ using CppType = typename CvrType::CppType;
+
+ auto hybrid_set = impl->get_set_func();
+ if (hybrid_set) {
+ // IN filter: build a fixed-value CVR from the HybridSet
+ auto rf_cvr = CvrType::create_empty_column_value_range(
+ pb.is_nullable, boundary_cvr.precision(), boundary_cvr.scale());
+ auto* iter = hybrid_set->begin();
+ while (iter->has_next()) {
+ const void* value = iter->get_value();
+ if (value) {
+ if constexpr (std::is_same_v<CppType, String>) {
+ // String HybridSets store StringRef*, but
+ // ColumnValueRange<String>::CppType is
+ // std::string. Construct from the bytes.
+ const auto* str_val = reinterpret_cast<const StringRef*>(value);
+ static_cast<void>(rf_cvr.add_fixed_value(
+ CppType(str_val->data, str_val->size)));
+ } else if constexpr (std::is_same_v<CppType, StringRef>) {
+ const auto* str_val = reinterpret_cast<const StringRef*>(value);
+ static_cast<void>(rf_cvr.add_fixed_value(
+ CppType(str_val->data, str_val->size)));
+ } else {
+ static_cast<void>(rf_cvr.add_fixed_value(
+ *reinterpret_cast<const CppType*>(value)));
+ }
+ }
+ iter->next();
+ }
+ auto boundary_copy = boundary_cvr;
+ boundary_copy.intersection(rf_cvr);
+ if (boundary_copy.is_empty_value_range()) {
+ newly_pruned.insert(pb.partition_id);
+ }
+ } else if ((impl->node_type() == TExprNodeType::BINARY_PRED ||
+ impl->node_type() == TExprNodeType::NULL_AWARE_BINARY_PRED) &&
+ impl->children().size() == 2 && impl->children()[1]->is_literal()) {
+ // MinMax filter: binary pred with literal bound
+ auto* literal = assert_cast<VLiteral*>(impl->children()[1].get());
+ auto col_ptr = literal->get_column_ptr();
+ auto data = col_ptr->get_data_at(0);
+ CppType val {};
+ if constexpr (std::is_same_v<CppType, String>) {
+ // get_data_at returns a StringRef pointing at the
+ // raw character bytes; ColumnValueRange<String>'s
+ // CppType is std::string, so construct one from
+ // the bytes rather than dereferencing as String.
+ val = CppType(data.data, data.size);
+ } else if constexpr (std::is_same_v<CppType, StringRef>) {
+ val = CppType(data.data, data.size);
+ } else {
+ val = *reinterpret_cast<const CppType*>(data.data);
+ }
+
+ SQLFilterOp op = convert_opcode_to_filter_op(impl->op());
+ if (op == FILTER_IN) {
+ return; // unrecognized opcode, skip
+ }
+
+ CvrType rf_cvr(boundary_cvr.column_name(), pb.is_nullable,
+ boundary_cvr.precision(), boundary_cvr.scale());
+ static_cast<void>(rf_cvr.add_range(op, val));
+
+ auto boundary_copy = boundary_cvr;
+ boundary_copy.intersection(rf_cvr);
+ if (boundary_copy.is_empty_value_range()) {
+ newly_pruned.insert(pb.partition_id);
+ }
+ }
+ },
+ pb.boundary_cvr);
+ }
+}
+
+int64_t RuntimeFilterPartitionPruner::prune_by_runtime_filters(
+ const ParsedPartitionBoundaries& parsed, const VExprContextSPtrs& conjuncts,
+ const std::vector<TRuntimeFilterDesc>& rf_descs, int scan_node_id) {
+ if (parsed.empty()) {
+ return 0;
+ }
+ const auto& partition_column_slot_ids = parsed.partition_column_slot_ids();
+
+ // Build filter_id -> monotonicity map
+ std::unordered_map<int, TTargetExprMonotonicity::type> filter_id_to_monotonicity;
+ for (const auto& desc : rf_descs) {
+ if (desc.__isset.planId_to_target_monotonicity) {
+ auto it = desc.planId_to_target_monotonicity.find(scan_node_id);
+ if (it != desc.planId_to_target_monotonicity.end()) {
+ filter_id_to_monotonicity[desc.filter_id] = it->second;
+ }
+ }
+ }
+
+ // This function is serialized by _conjuncts_lock in the caller, so our reads
+ // of _pruned_partition_ids never race with our writes below. The only concurrent
+ // readers are is_partition_pruned() calls (under shared_lock), which are
+ // properly synchronized by the unique_lock we take when inserting.
+ phmap::flat_hash_set<int64_t> newly_pruned;
+
+ for (const auto& conjunct_ctx : conjuncts) {
+ VExprSPtr root = conjunct_ctx->root();
+ if (!root->is_rf_wrapper()) {
+ continue;
+ }
+
+ VExprSPtr impl = root->get_impl();
+ if (!impl) {
+ continue;
+ }
+
+ if (impl->children().empty()) {
+ continue;
+ }
+
+ auto* wrapper_root = assert_cast<VRuntimeFilterWrapper*>(root.get());
+ int filter_id = wrapper_root->filter_id();
+
+ VExprSPtr target_subtree = impl->children()[0];
+
+ // Identity case: target is a simple SlotRef on a partition column
+ if (target_subtree->is_slot_ref()) {
+ auto* slot_ref = assert_cast<VSlotRef*>(target_subtree.get());
+ SlotId slot_id = slot_ref->slot_id();
+ if (!partition_column_slot_ids.contains(slot_id)) {
+ continue;
+ }
+
+ const auto& slot_to_boundaries = parsed.slot_to_boundaries();
+ auto boundaries_it = slot_to_boundaries.find(slot_id);
+ if (boundaries_it == slot_to_boundaries.end()) {
+ continue;
+ }
+
+ _try_prune_by_single_rf(boundaries_it->second, impl, newly_pruned);
+ } else {
+ // Non-identity case: check if it's a monotonic target
+ auto mono_it = filter_id_to_monotonicity.find(filter_id);
+ if (mono_it == filter_id_to_monotonicity.end() ||
+ mono_it->second == TTargetExprMonotonicity::NON_MONOTONIC) {
+ continue;
+ }
+
+ const VSlotRef* leaf_slot = find_unique_slot_ref(target_subtree.get());
+ if (!leaf_slot) {
+ continue;
+ }
+
+ SlotId leaf_slot_id = leaf_slot->slot_id();
+ if (!partition_column_slot_ids.contains(leaf_slot_id)) {
+ continue;
+ }
+
+ int leaf_column_id = leaf_slot->column_id();
+ TTargetExprMonotonicity::type direction = mono_it->second;
+
+ const std::vector<ParsedBoundary>* projected =
+ parsed.get_or_compute_projection(filter_id, target_subtree, leaf_slot_id,
+ leaf_column_id, direction, conjunct_ctx.get());
+
+ if (projected == nullptr || projected->empty()) {
+ continue;
+ }
+
+ _try_prune_by_single_rf(*projected, impl, newly_pruned);
+ }
+ }
+
+ auto count = static_cast<int64_t>(newly_pruned.size());
+ if (count > 0) {
+ std::unique_lock lock(_prune_mutex);
+ for (int64_t pid : newly_pruned) {
+ _pruned_partition_ids.insert(pid);
+ }
+ }
+ return count;
+}
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
+bool RuntimeFilterPartitionPruner::is_partition_pruned(int64_t partition_id) const {
+ std::shared_lock lock(_prune_mutex);
+ return _pruned_partition_ids.contains(partition_id);
+}
+
+int64_t RuntimeFilterPartitionPruner::pruned_partition_count() const {
+ std::shared_lock lock(_prune_mutex);
+ return static_cast<int64_t>(_pruned_partition_ids.size());
+}
+
+} // namespace doris
diff --git a/be/src/exec/runtime_filter/runtime_filter_partition_pruner.h b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.h
new file mode 100644
index 0000000..31712ae
--- /dev/null
+++ b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.h
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/global_types.h"
+#include "core/data_type/data_type.h"
+#include "exec/common/hash_table/phmap_fwd_decl.h"
+#include "exprs/vexpr_fwd.h"
+#include "storage/olap_scan_common.h"
+
+namespace doris {
+
+class SlotDescriptor;
+class VExprContext;
+struct TPartitionBoundary;
+struct TRuntimeFilterDesc;
+struct TTargetExprMonotonicity;
+
+// Parsed representation of one partition boundary for one slot column.
+struct ParsedBoundary {
+ int64_t partition_id = 0;
+ SlotId slot_id = 0;
+ bool is_nullable = false;
+ ColumnValueRangeType boundary_cvr;
+ // True if the partition's value set is exactly {NULL} (e.g. LIST
+ // partition whose only key is NULL). The CVR alone cannot encode
+ // "only NULL" -- it stays as the whole range with contain_null=true
+ // -- so we track it explicitly to enable accurate pruning.
+ bool only_null = false;
+ // True if the partition's value set includes NULL (covers both
+ // only-NULL and mixed LIST partitions). Tracked separately because
+ // ColumnValueRange::set_contain_null(true) destructively clears the
+ // fixed-value set, so we cannot stash the NULL flag inside the CVR
+ // alongside the concrete values.
+ bool contains_null = false;
+ // For projected boundaries: true if the target_expr produced NULL on this
+ // partition's input value (cannot prune conservatively).
+ bool null_in_projection = false;
+};
+
+// Immutable, fragment-shared parse result of TPartitionBoundary list.
+//
+// Parsing is expensive (constructs VLiteral per literal, materializes a
+// ColumnPtr, builds ColumnValueRange) and depends only on plan-time data.
+// All pipeline instances of the same fragment share one parse, performed
+// in OperatorX::prepare() which is single-threaded fragment setup.
+class ParsedPartitionBoundaries {
+public:
+ ParsedPartitionBoundaries() = default;
+
+ // Build the parse result from the thrift `boundaries` list. Caller must
+ // ensure this is invoked at most once per instance (OperatorX::prepare()
+ // is the natural call site).
+ void parse(const std::vector<TPartitionBoundary>& boundaries,
+ const phmap::flat_hash_map<int, SlotDescriptor*>& slot_descs);
+
+ bool empty() const { return _partition_column_slot_ids.empty(); }
+ int64_t total_partitions() const { return _total_partition_count; }
+
+ const std::unordered_map<SlotId, std::vector<ParsedBoundary>>& slot_to_boundaries() const {
+ return _slot_to_boundaries;
+ }
+ const std::unordered_set<SlotId>& partition_column_slot_ids() const {
+ return _partition_column_slot_ids;
+ }
+
+ // Lazily compute projected boundaries for a non-identity monotonic target.
+ // `target_expr` is `impl->children()[0]` of the RF wrapper (a sub-tree of
+ // the conjunct). `leaf_slot_id` is the unique VSlotRef leaf inside it
+ // (FE asserted target_expr has exactly one input slot). `leaf_column_id`
+ // is that slot ref's `column_id()` -- the position in the runtime block.
+ // `direction` is the FE-side cumulative monotonicity. `ctx` is the
+ // conjunct's VExprContext (used to execute the sub-expression).
+ //
+ // Returns nullptr if projection is unsupported (e.g. non-RANGE partition,
+ // unsupported primitive type) -- caller then skips this RF.
+ //
+ // Direction:
+ // MONOTONIC_INCREASING: projected lo and hi keep their roles
+ // MONOTONIC_DECREASING: swap (projected lo, hi) -> (hi, lo)
+ //
+ // NULL: any input value that projects to NULL marks
+ // null_in_projection=true; the existing _try_prune_by_single_rf
+ // conservatively keeps the partition.
+ const std::vector<ParsedBoundary>* get_or_compute_projection(
+ int filter_id, const VExprSPtr& target_expr, SlotId leaf_slot_id, int leaf_column_id,
+ TTargetExprMonotonicity::type direction, VExprContext* ctx) const;
+
+private:
+ std::unordered_map<SlotId, std::vector<ParsedBoundary>> _slot_to_boundaries;
+ std::unordered_set<SlotId> _partition_column_slot_ids;
+ int64_t _total_partition_count = 0;
+ std::unordered_map<SlotId, DataTypePtr> _slot_data_types;
+
+ mutable std::mutex _projection_cache_mutex;
+ mutable std::unordered_map<int /*filter_id*/, std::vector<ParsedBoundary>> _projection_cache;
+};
+
+// Per-instance pruning state for runtime-filter partition pruning.
+//
+// Holds the set of partition IDs already pruned for this scan instance and
+// nothing else: the parsed boundaries are shared per-fragment and reached via
+// `OperatorXBase::parsed_partition_boundaries()`. The owner (ScanLocalStateBase)
+// passes the parsed object into `prune_by_runtime_filters` on each call.
+//
+// Thread safety: `is_partition_pruned()` is safe to call concurrently with
+// `prune_by_runtime_filters()` via an internal shared_mutex.
+class RuntimeFilterPartitionPruner {
+public:
+ RuntimeFilterPartitionPruner() = default;
+
+ // Evaluate RF conjuncts against the given parsed boundaries and mark
+ // pruned partitions on this per-instance state. Returns the number of
+ // *newly* pruned partitions in this call.
+ int64_t prune_by_runtime_filters(const ParsedPartitionBoundaries& parsed,
+ const VExprContextSPtrs& conjuncts,
+ const std::vector<TRuntimeFilterDesc>& rf_descs,
+ int scan_node_id);
+
+ // Thread-safe query: is the given partition_id pruned?
+ bool is_partition_pruned(int64_t partition_id) const;
+
+ // Number of partitions currently marked as pruned.
+ int64_t pruned_partition_count() const;
+
+private:
+ phmap::flat_hash_set<int64_t> _pruned_partition_ids;
+ mutable std::shared_mutex _prune_mutex;
+
+ // Try to prune partitions using a single RF's impl expression on the given boundaries.
+ // Adds newly pruned partition IDs to `newly_pruned`.
+ void _try_prune_by_single_rf(const std::vector<ParsedBoundary>& boundaries,
+ const VExprSPtr& impl,
+ phmap::flat_hash_set<int64_t>& newly_pruned);
+};
+
+} // namespace doris
diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp
index 96e3e12..57af3ee 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -610,6 +610,13 @@
return Status::OK();
}
+bool OlapScanner::check_partition_pruned() const {
+ if (!_local_state) {
+ return false;
+ }
+ return _local_state->is_partition_pruned(_tablet_reader_params.tablet->partition_id());
+}
+
doris::TabletStorageType OlapScanner::get_storage_type() {
if (config::is_cloud_mode()) {
// we don't have cold storage in cloud mode, all storage is treated as local
diff --git a/be/src/exec/scan/olap_scanner.h b/be/src/exec/scan/olap_scanner.h
index 2187be4..ab84525 100644
--- a/be/src/exec/scan/olap_scanner.h
+++ b/be/src/exec/scan/olap_scanner.h
@@ -77,6 +77,8 @@
doris::TabletStorageType get_storage_type() override;
+ bool check_partition_pruned() const override;
+
void update_realtime_counters() override;
protected:
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index 14d1b9e..bf38dda 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -181,6 +181,10 @@
return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
}
+ // Returns true if this scanner's partition has been pruned by a runtime filter.
+ // Overridden by OlapScanner to check partition pruning state.
+ virtual bool check_partition_pruned() const { return false; }
+
bool need_to_close() const { return _need_to_close; }
void mark_to_need_to_close() {
diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp
index 7abe192..5296fa4 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -203,6 +203,9 @@
<< rf_status.to_string();
}
+ // After processing late RFs, check if this scanner's partition was pruned.
+ if (!eos && scanner->check_partition_pruned()) { eos = true; }
+
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
if (ctx->low_memory_mode()) {
ctx->clear_free_blocks();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 5192df1..9807ce5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -22,6 +22,9 @@
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionType;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
@@ -33,6 +36,7 @@
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DistributionMode;
import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
@@ -41,6 +45,7 @@
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
import org.apache.doris.thrift.TRuntimeFilterType;
+import org.apache.doris.thrift.TTargetExprMonotonicity;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -170,6 +175,7 @@
List<Expr> targetExprList = new ArrayList<>();
List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new ArrayList<>();
List<ScanNode> scanNodeList = new ArrayList<>();
+ List<RuntimeFilter> targetFilterList = new ArrayList<>();
boolean hasInvalidTarget = false;
for (RuntimeFilter filter : group) {
Slot curTargetSlot = filter.getTargetSlot();
@@ -204,6 +210,7 @@
TupleId targetTupleId = targetSlotRef.getDesc().getParentId();
SlotId targetSlotId = targetSlotRef.getSlotId();
scanNodeList.add(scanNode);
+ targetFilterList.add(filter);
targetExprList.add(targetExpr);
targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)));
}
@@ -228,6 +235,9 @@
Expr targetExpr = targetExprList.get(i);
origFilter.addTarget(new RuntimeFilterTarget(
scanNode, targetExpr, true, isLocalTarget));
+ TTargetExprMonotonicity mono = classifyMonotonicityForPartitionPruning(
+ targetFilterList.get(i).getTargetExpression(), scanNode);
+ origFilter.setTargetMonotonicity(scanNode.getId(), mono);
}
origFilter.setBitmapFilterNotIn(head.isBitmapFilterNotIn());
origFilter.setBloomFilterSizeCalculatedByNdv(head.isBloomFilterSizeCalculatedByNdv());
@@ -322,6 +332,9 @@
Expr targetExpr = targetExprList.get(i);
origFilter.addTarget(new RuntimeFilterTarget(
scanNode, targetExpr, true, isLocalTarget));
+ TTargetExprMonotonicity mono = classifyMonotonicityForPartitionPruning(
+ filter.getTargetExpressions().get(i), scanNode);
+ origFilter.setTargetMonotonicity(scanNode.getId(), mono);
}
origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
origFilter.setBloomFilterSizeCalculatedByNdv(filter.isBloomFilterSizeCalculatedByNdv());
@@ -350,4 +363,43 @@
origFilter.extractTargetsPosition();
return origFilter;
}
+
+ /**
+ * Classify whether a runtime-filter target expression can drive partition
+ * pruning on the given scan node.
+ *
+ * <p>Conservative implementation: only identity {@link Slot} references on a
+ * partition column are treated as monotonic increasing. Although Nereids
+ * exposes a {@code Monotonic} interface, its
+ * {@code isMonotonic(Literal lower, Literal upper)} contract is range-aware
+ * and most current implementations do not yet take the bounds into account
+ * correctly; treating every {@code instanceof Monotonic} node as
+ * unconditionally monotonic over the full domain would be unsound. Once the
+ * {@code Monotonic} implementations are completed/audited we can revisit
+ * this and walk through monotonic chains here.
+ */
+ static TTargetExprMonotonicity classifyMonotonicityForPartitionPruning(
+ Expression nereidsExpr, org.apache.doris.planner.PlanNode scanNode) {
+ if (!(scanNode instanceof OlapScanNode)) {
+ return TTargetExprMonotonicity.NON_MONOTONIC;
+ }
+ if (!(nereidsExpr instanceof Slot)) {
+ return TTargetExprMonotonicity.NON_MONOTONIC;
+ }
+ OlapTable table = ((OlapScanNode) scanNode).getOlapTable();
+ if (table == null) {
+ return TTargetExprMonotonicity.NON_MONOTONIC;
+ }
+ PartitionType partType = table.getPartitionInfo().getType();
+ if (partType != PartitionType.RANGE && partType != PartitionType.LIST) {
+ return TTargetExprMonotonicity.NON_MONOTONIC;
+ }
+ String colName = ((Slot) nereidsExpr).getName();
+ for (Column partCol : table.getPartitionInfo().getPartitionColumns()) {
+ if (partCol.getName().equalsIgnoreCase(colName)) {
+ return TTargetExprMonotonicity.MONOTONIC_INCREASING;
+ }
+ }
+ return TTargetExprMonotonicity.NON_MONOTONIC;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 8a65687..d60a29c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -21,6 +21,8 @@
import org.apache.doris.analysis.ExprToSqlVisitor;
import org.apache.doris.analysis.ExprToThriftVisitor;
import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.MaxLiteral;
+import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
@@ -36,13 +38,16 @@
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.IndexToThriftConvertor;
+import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Tablet;
@@ -66,12 +71,14 @@
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TNormalizedOlapScanNode;
import org.apache.doris.thrift.TNormalizedPlanNode;
import org.apache.doris.thrift.TOlapScanNode;
import org.apache.doris.thrift.TOlapTableIndex;
import org.apache.doris.thrift.TPaloScanRange;
+import org.apache.doris.thrift.TPartitionBoundary;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TPrimitiveType;
@@ -1211,9 +1218,169 @@
msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds));
+ // Populate partition boundaries for BE-side runtime filter partition pruning.
+ // Only serialize when this scan node actually has at least one runtime
+ // filter whose target expression can drive partition pruning (identity
+ // or monotonic function on a partition column), so we don't bloat
+ // thrift for tables with many partitions but no usable RF target.
+ // Gated by session variable `enable_runtime_filter_partition_prune`.
+ ConnectContext rfPruneCtx = ConnectContext.get();
+ if (rfPruneCtx != null
+ && rfPruneCtx.getSessionVariable().isEnableRuntimeFilterPartitionPrune()
+ && hasRfDrivingPartitionPruning()) {
+ setPartitionBoundaries(msg.olap_scan_node);
+ }
+
super.toThrift(msg);
}
+ private boolean hasRfDrivingPartitionPruning() {
+ if (selectedPartitionIds == null || selectedPartitionIds.size() < 2) {
+ return false;
+ }
+ PlanNodeId myId = this.getId();
+ for (RuntimeFilter rf : runtimeFilters) {
+ if (rf.canPrunePartitionsFor(myId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void setPartitionBoundaries(TOlapScanNode olapScanNode) {
+ PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+ PartitionType partType = partitionInfo.getType();
+ if (partType != PartitionType.RANGE && partType != PartitionType.LIST) {
+ return;
+ }
+ List<Column> partColumns = partitionInfo.getPartitionColumns();
+ if (partColumns.isEmpty()) {
+ return;
+ }
+
+ // Build partition column name → slot ID mapping
+ Map<String, Integer> partColToSlotId = Maps.newHashMap();
+ for (SlotDescriptor slot : desc.getSlots()) {
+ if (slot.getColumn() == null) {
+ continue;
+ }
+ for (Column partCol : partColumns) {
+ if (slot.getColumn().getName().equalsIgnoreCase(partCol.getName())) {
+ partColToSlotId.put(partCol.getName(), slot.getId().asInt());
+ break;
+ }
+ }
+ }
+ if (partColToSlotId.isEmpty()) {
+ return;
+ }
+
+ List<TPartitionBoundary> boundaries = new ArrayList<>();
+ for (Long partitionId : selectedPartitionIds) {
+ PartitionItem item = partitionInfo.getItem(partitionId);
+ if (item == null) {
+ continue;
+ }
+ if (item instanceof RangePartitionItem) {
+ addRangeBoundaries(boundaries, partitionId, (RangePartitionItem) item,
+ partColumns, partColToSlotId);
+ } else if (item instanceof ListPartitionItem) {
+ addListBoundaries(boundaries, partitionId, (ListPartitionItem) item,
+ partColumns, partColToSlotId);
+ }
+ }
+ if (!boundaries.isEmpty()) {
+ olapScanNode.setPartitionBoundaries(boundaries);
+ }
+ }
+
+ private void addRangeBoundaries(List<TPartitionBoundary> boundaries, long partitionId,
+ RangePartitionItem rangeItem, List<Column> partColumns,
+ Map<String, Integer> partColToSlotId) {
+ // We always project a (possibly multi-column) RANGE partition onto its
+ // first partition column, since the BE pruner only consumes per-column
+ // boundaries. Projection rules (lex compare semantics):
+ //
+ // single column [L, U):
+ // projection = [L, U) → range_end_inclusive = false
+ //
+ // multi-column [(L1, L2, ...), (U1, U2, ...)):
+ // k1 = L1 is reachable (inner tuple can be ≥ (L2, ...))
+ // k1 ∈ (L1, U1) is fully reachable
+ // k1 = U1 may be reachable via inner tuple < (U2, ...)
+ // projection = [L1, U1] (CLOSED both ends, conservative)
+ // → range_end_inclusive = true
+ //
+ // The half-open form [L1, U1) for multi-column would be a strict
+ // UNDER-approximation. Example: partition [(1,1), (1,5)) projects to
+ // {1}, but [1, 1) is empty and would let the BE wrongly prune the
+ // partition for an RF like k1 = 1.
+ String colName = partColumns.get(0).getName();
+ Integer slotId = partColToSlotId.get(colName);
+ if (slotId == null) {
+ return;
+ }
+ com.google.common.collect.Range<PartitionKey> range = rangeItem.getItems();
+ TPartitionBoundary boundary = new TPartitionBoundary();
+ boundary.setPartitionId(partitionId);
+ boundary.setSlotId(slotId);
+ if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) {
+ LiteralExpr lower = range.lowerEndpoint().getKeys().get(0);
+ if (!(lower instanceof MaxLiteral)) {
+ boundary.setRangeStart(
+ ExprToThriftVisitor.treeToThrift(lower).getNodes().get(0));
+ }
+ }
+ if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) {
+ LiteralExpr upper = range.upperEndpoint().getKeys().get(0);
+ if (!(upper instanceof MaxLiteral)) {
+ boundary.setRangeEnd(
+ ExprToThriftVisitor.treeToThrift(upper).getNodes().get(0));
+ }
+ }
+ if (partColumns.size() > 1) {
+ boundary.setRangeEndInclusive(true);
+ }
+ boundaries.add(boundary);
+ }
+
+ private void addListBoundaries(List<TPartitionBoundary> boundaries, long partitionId,
+ ListPartitionItem listItem, List<Column> partColumns,
+ Map<String, Integer> partColToSlotId) {
+ if (listItem.isDefaultPartition()) {
+ return;
+ }
+ List<PartitionKey> partitionKeys = listItem.getItems();
+ // For LIST partitions, emit per-column distinct value sets. NULL keys
+ // are emitted as NULL_LITERAL TExprNode so the BE parser can translate
+ // them into ColumnValueRange::set_contain_null(true) rather than
+ // treating NULL as an ordinary fixed value (which would crash the
+ // typed value extractor in the parser).
+ for (int i = 0; i < partColumns.size(); i++) {
+ String colName = partColumns.get(i).getName();
+ Integer slotId = partColToSlotId.get(colName);
+ if (slotId == null) {
+ continue;
+ }
+ TPartitionBoundary boundary = new TPartitionBoundary();
+ boundary.setPartitionId(partitionId);
+ boundary.setSlotId(slotId);
+ List<TExprNode> listValues = new ArrayList<>(partitionKeys.size());
+ for (PartitionKey pk : partitionKeys) {
+ LiteralExpr literalExpr = pk.getKeys().get(i);
+ if (literalExpr.isNullLiteral()) {
+ listValues.add(ExprToThriftVisitor.treeToThrift(
+ NullLiteral.create(literalExpr.getType())).getNodes().get(0));
+ } else {
+ listValues.add(
+ ExprToThriftVisitor.treeToThrift(literalExpr).getNodes().get(0));
+ }
+ }
+ boundary.setListValues(listValues);
+ boundaries.add(boundary);
+ }
+ }
+
@Override
public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) {
TNormalizedOlapScanNode normalizedOlapScanNode = new TNormalizedOlapScanNode();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index f4ea16c..4eb152f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -31,6 +31,7 @@
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
import org.apache.doris.thrift.TRuntimeFilterDesc;
import org.apache.doris.thrift.TRuntimeFilterType;
+import org.apache.doris.thrift.TTargetExprMonotonicity;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -39,6 +40,7 @@
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -133,6 +135,15 @@
private boolean singleEq = false;
+ // Per-target monotonicity for BE-side runtime-filter partition pruning,
+ // keyed by the target scan node's plan id. Filled by the Nereids
+ // RuntimeFilterTranslator at translation time (where the original
+ // Nereids Expression is still available for monotonicity classification);
+ // legacy planner cannot redo the classification because it only has
+ // analysis.Expr. Read by toThrift() and canPrunePartitionsFor().
+ private final Map<PlanNodeId, TTargetExprMonotonicity> targetMonotonicityByScanId
+ = new HashMap<>();
+
/**
* Internal representation of a runtime filter target.
*/
@@ -310,9 +321,52 @@
} else if (builderNode instanceof SetOperationNode) {
tFilter.setNullAware(true);
}
+
+ // Per-target monotonicity for BE-side partition pruning. Populated
+ // upstream by RuntimeFilterTranslator (where the original Nereids
+ // Expression is available); we only forward it to the BE here.
+ // Gated by session variable `enable_runtime_filter_partition_prune`.
+ ConnectContext rfPruneCtx = ConnectContext.get();
+ boolean enableRfPartitionPrune = rfPruneCtx != null
+ && rfPruneCtx.getSessionVariable().isEnableRuntimeFilterPartitionPrune();
+ if (enableRfPartitionPrune && !targetMonotonicityByScanId.isEmpty()) {
+ Map<Integer, TTargetExprMonotonicity> monoMap = new HashMap<>();
+ for (Map.Entry<PlanNodeId, TTargetExprMonotonicity> e
+ : targetMonotonicityByScanId.entrySet()) {
+ if (e.getValue() != TTargetExprMonotonicity.NON_MONOTONIC) {
+ monoMap.put(e.getKey().asInt(), e.getValue());
+ }
+ }
+ if (!monoMap.isEmpty()) {
+ tFilter.setPlanIdToTargetMonotonicity(monoMap);
+ }
+ }
+
return tFilter;
}
+ /**
+ * Record monotonicity for a target. Called by RuntimeFilterTranslator after
+ * the corresponding RuntimeFilterTarget has been added.
+ */
+ public void setTargetMonotonicity(PlanNodeId scanNodeId, TTargetExprMonotonicity m) {
+ targetMonotonicityByScanId.put(scanNodeId, m);
+ }
+
+ /**
+ * Returns true iff this RF can drive partition pruning for the given target
+ * scan node. Used by OlapScanNode.toThrift to decide whether it is worth
+ * serializing partition_boundaries to BE. Mirrors exactly the condition
+ * that toThrift() emits planId_to_target_monotonicity for: monotonic and
+ * grounded on a partition column. When BE later supports more shapes, the
+ * single source of truth for that decision lives in
+ * RuntimeFilterTranslator.classifyMonotonicityForPartitionPruning.
+ */
+ public boolean canPrunePartitionsFor(PlanNodeId scanNodeId) {
+ TTargetExprMonotonicity m = targetMonotonicityByScanId.get(scanNodeId);
+ return m != null && m != TTargetExprMonotonicity.NON_MONOTONIC;
+ }
+
public boolean hasTargets() {
return !targets.isEmpty();
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 843087f..91ad555 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -696,6 +696,49 @@
6: optional string bloom_literal
}
+// Partition boundary descriptor for BE-side runtime filter partition pruning.
+// FE sends only partitions that are candidates for pruning; partitions FE does
+// not want pruned (e.g. default catch-all partitions) are simply omitted.
+//
+// Partition type is inferred from which optional fields are set:
+// - range_start / range_end set → Range partition
+// - list_values set → List partition
+//
+// For Range partitions:
+// - range_start absent → no lower-bound constraint (negative infinity)
+// - range_end absent → no upper-bound constraint (MAXVALUE / positive infinity)
+struct TPartitionBoundary {
+ 1: optional Types.TPartitionId partition_id
+ // slot_id of the partition column
+ 2: optional Types.TSlotId slot_id
+
+ // Range partition: closed lower bound; absent means unbounded below
+ 3: optional Exprs.TExprNode range_start
+ // Range partition: upper bound. By default (range_end_inclusive=false) the
+ // bound is OPEN, i.e. the column range is [range_start, range_end), which
+ // matches Doris RANGE partition's `VALUES [..., ...)` syntax for the
+ // single-column case. Absent means unbounded above (MAXVALUE).
+ 4: optional Exprs.TExprNode range_end
+
+ // List partition: set of concrete values in this partition. A NULL_LITERAL
+ // entry indicates the partition logically contains NULL rows for this column;
+ // the BE pruner translates that into ColumnValueRange::set_contain_null(true)
+ // rather than treating it as an ordinary fixed value.
+ 5: optional list<Exprs.TExprNode> list_values
+
+ // When true, treat `range_end` as a CLOSED upper bound, i.e. the projected
+ // column range is [range_start, range_end]. Used when projecting a
+ // multi-column RANGE partition onto its first column: a partition like
+ // [(L1, L2, ...), (U1, U2, ...)) projects to the first column as
+ // [L1, U1] (both ends closed) — for the L1 == U1 case the projection is the
+ // singleton {L1}, and for L1 < U1 the value U1 is reachable via inner-tuple
+ // values of the second+ column. The original half-open form [L1, U1) would
+ // be a strict UNDER-approximation and could wrongly prune the partition.
+ // The single-column case keeps the default open form to preserve exact
+ // Doris partition semantics.
+ 6: optional bool range_end_inclusive = false
+}
+
struct TMetaScanRange {
1: optional Types.TMetadataType metadata_type
2: optional TIcebergMetadataParams iceberg_params // deprecated
@@ -947,6 +990,13 @@
24: optional bool enable_mor_value_predicate_pushdown
// Read MOR table as DUP table: skip merge, skip delete sign
25: optional bool read_mor_as_dup
+ // Partition-id → tablet-id list mapping; used together with partition_boundaries
+ // for BE-side runtime filter partition pruning.
+ 26: optional map<Types.TPartitionId, list<Types.TTabletId>> partition_to_tablets
+ // Partition boundary descriptors for BE-side runtime filter partition pruning.
+ // Only partitions that are candidates for pruning are included; partitions FE
+ // does not want pruned (e.g. default catch-all) are omitted from this list.
+ 27: optional list<TPartitionBoundary> partition_boundaries
}
struct TEqJoinCondition {
@@ -1429,6 +1479,18 @@
MIN_MAX = 4
}
+// Monotonicity of a runtime filter's target expression, used by BE-side
+// partition pruning. For Range partitions, only MONOTONIC_INCREASING or
+// MONOTONIC_DECREASING target expressions allow safe boundary transformation
+// and pruning. List partitions can always be pruned regardless of monotonicity.
+// When the target expression is a plain SlotRef (identity), FE may omit this
+// field; BE treats an absent value as NON_MONOTONIC (conservative).
+enum TTargetExprMonotonicity {
+ NON_MONOTONIC = 0,
+ MONOTONIC_INCREASING = 1,
+ MONOTONIC_DECREASING = 2
+}
+
struct TTopnFilterDesc {
// topn node id
1: required i32 source_node_id
@@ -1495,6 +1557,14 @@
16: optional bool sync_filter_size; // Deprecated
17: optional bool build_bf_by_runtime_size;
+
+ // Per-target monotonicity for BE-side partition pruning, keyed by target
+ // plan-node ID. For Range partitions, only monotonic target expressions
+ // allow boundary transformation; List partitions can always be pruned
+ // regardless. Absent entry / NON_MONOTONIC → BE skips Range partition
+ // pruning for that target. When the target expression is a plain SlotRef,
+ // FE should set MONOTONIC_INCREASING (identity is trivially monotonic).
+ 18: optional map<Types.TPlanNodeId, TTargetExprMonotonicity> planId_to_target_monotonicity;
}
diff --git a/regression-test/data/query_p0/runtime_filter/rf_partition_pruning.out b/regression-test/data/query_p0/runtime_filter/rf_partition_pruning.out
new file mode 100644
index 0000000..e4801bc
--- /dev/null
+++ b/regression-test/data/query_p0/runtime_filter/rf_partition_pruning.out
@@ -0,0 +1,126 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !range_int_in --
+1 10 a
+3 50 c
+5 90 e
+
+-- !range_int_minmax --
+1 10 a
+3 50 c
+5 90 e
+
+-- !range_date_in --
+1 2024-01-15 jan
+2 2024-02-20 feb
+3 2024-03-10 mar
+
+-- !range_date_minmax --
+1 2024-01-15 jan
+2 2024-02-20 feb
+3 2024-03-10 mar
+
+-- !list_str_in --
+1 Beijing a
+2 Beijing b
+3 Beijing c
+
+-- !list_int_in --
+1 1 a
+2 1 b
+5 3 e
+6 3 f
+
+-- !no_pruning --
+13 250 m
+18 350 r
+3 50 c
+8 150 h
+
+-- !range_int_agg --
+3 10 90
+
+-- !list_str_agg --
+Beijing 3
+
+-- !list_int_multi --
+3 2 c
+4 2 d
+7 4 g
+8 4 h
+
+-- !bigint_range_in --
+1 100 v0a
+11 250100 v5a
+12 275000 v5b
+2 25000 v0b
+
+-- !bigint_range_minmax --
+1 100 v0a
+11 250100 v5a
+12 275000 v5b
+2 25000 v0b
+
+-- !datetime_range_in --
+1 2024-02-15T10:00 jan
+
+-- !datetime_range_minmax --
+1 2024-02-15T10:00 jan
+
+-- !list_50_in --
+10 10 cat10
+25 25 cat25
+3 3 cat3
+40 40 cat40
+50 50 cat50
+
+-- !neg_range_in --
+7 -50 g
+8 -10 h
+
+-- !neg_range_minmax --
+7 -50 g
+8 -10 h
+
+-- !desc_dim_in --
+1 10 a
+3 50 c
+5 90 e
+
+-- !desc_dim_minmax --
+1 10 a
+3 50 c
+5 90 e
+
+-- !combined_rf --
+1 10 a
+3 50 c
+5 90 e
+
+-- !span_in --
+13 250 m
+3 50 c
+
+-- !span_minmax --
+13 250 m
+3 50 c
+
+-- !list_only_null_pruned --
+3 1 c
+4 1 d
+
+-- !list_only_null_minmax --
+3 1 c
+4 1 d
+
+-- !list_mixed_value_kept --
+2 5 b
+
+-- !list_mixed_nullsafe --
+1 \N
+
+-- !range_multi_first_col --
+1 1 1 a
+2 1 4 b
+3 1 5 c
+4 1 9 d
+
diff --git a/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
new file mode 100644
index 0000000..8d3955e
--- /dev/null
+++ b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
@@ -0,0 +1,1348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.action.ProfileAction
+
+// Marked nonConcurrent: this suite asserts on FE query-profile counters
+// (TotalPartitionsForRFPruning / PartitionsPrunedByRuntimeFilter). Even
+// though each query embeds a unique UUID token to look up its own profile,
+// the FE profile list is bounded (max_query_profile_num) and shared across
+// sessions, so heavy parallel traffic could evict our profile before the
+// poller finds it. Running serially keeps the assertions deterministic.
+suite("rf_partition_pruning", "nonConcurrent") {
+ // Disable the legacy RuntimeFilterPruner: it strips RFs whose effectiveness
+ // cannot be statistically verified, and the small INSERT-only tables in
+ // this suite have no analyzed column stats, so the pruner would otherwise
+ // drop every RF and the partition-pruning counters under test would never
+ // populate.
+ sql "set enable_runtime_filter_prune=false;"
+
+ // ---- Profile utilities ----
+ def profileAction = new ProfileAction(context)
+
+ def getProfileByToken = { String token ->
+ String profileContent = ""
+ for (int attempt = 0; attempt < 15; attempt++) {
+ List profileData = profileAction.getProfileList()
+ for (final def profileItem in profileData) {
+ if (profileItem["Sql Statement"].toString().contains(token)) {
+ profileContent = profileAction.getProfile(profileItem["Profile ID"].toString())
+ break
+ }
+ }
+ if (profileContent != "") break
+ Thread.sleep(500)
+ }
+ return profileContent
+ }
+
+ def extractCounterSum = { String profileText, String counterName ->
+ def values = (profileText =~ /-\s*${counterName}:\s*(\d+)/).collect { it[1].toLong() }
+ return values.isEmpty() ? 0L : values.sum()
+ }
+
+ // Run a join query with a unique token to capture profile, then assert pruning counters.
+ // rfType: runtime_filter_type hint value
+ // expectedTotal: expected TotalPartitionsForRFPruning (minimum)
+ // expectedPruned: expected PartitionsPrunedByRuntimeFilter (minimum, 0 means exactly 0)
+ def assertPruningProfile = { String queryBody, String rfType, long expectedTotal, long expectedPruned ->
+ def token = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='${rfType}') */ "${token}", ${queryBody}
+ """
+ def profile = getProfileByToken(token)
+ def total = extractCounterSum(profile, "TotalPartitionsForRFPruning")
+ def pruned = extractCounterSum(profile, "PartitionsPrunedByRuntimeFilter")
+ logger.info("Profile [${token}]: total=${total}, pruned=${pruned}")
+ assertTrue(total >= expectedTotal, "TotalPartitionsForRFPruning: expected >= ${expectedTotal}, got ${total}")
+ if (expectedPruned == 0) {
+ assertTrue(pruned == 0, "PartitionsPrunedByRuntimeFilter: expected 0, got ${pruned}")
+ } else {
+ assertTrue(pruned >= expectedPruned, "PartitionsPrunedByRuntimeFilter: expected >= ${expectedPruned}, got ${pruned}")
+ }
+ }
+
+ // ============================================================
+ // Setup: Range-partitioned fact table (INT partition column)
+ // ============================================================
+ sql "drop table if exists rf_prune_range_int"
+ sql """
+ CREATE TABLE rf_prune_range_int (
+ id INT NOT NULL,
+ part_col INT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(part_col) (
+ PARTITION p1 VALUES [("0"), ("100")),
+ PARTITION p2 VALUES [("100"), ("200")),
+ PARTITION p3 VALUES [("200"), ("300")),
+ PARTITION p4 VALUES [("300"), ("400"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql "drop table if exists rf_prune_dim_int"
+ sql """
+ CREATE TABLE rf_prune_dim_int (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql """INSERT INTO rf_prune_range_int VALUES
+ (1, 10, 'a'), (2, 20, 'b'), (3, 50, 'c'), (4, 80, 'd'), (5, 90, 'e'),
+ (6, 110, 'f'), (7, 120, 'g'), (8, 150, 'h'), (9, 180, 'i'), (10, 190, 'j'),
+ (11, 210, 'k'), (12, 220, 'l'), (13, 250, 'm'), (14, 280, 'n'), (15, 290, 'o'),
+ (16, 310, 'p'), (17, 320, 'q'), (18, 350, 'r'), (19, 380, 's'), (20, 390, 't')
+ """
+
+ sql """INSERT INTO rf_prune_dim_int VALUES (10, 'x'), (50, 'y'), (90, 'z')"""
+
+ // ============================================================
+ // Setup: Range-partitioned fact table (DATE partition column)
+ // ============================================================
+ sql "drop table if exists rf_prune_range_date"
+ sql """
+ CREATE TABLE rf_prune_range_date (
+ id INT NOT NULL,
+ dt DATE NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(dt) (
+ PARTITION p2024q1 VALUES [("2024-01-01"), ("2024-04-01")),
+ PARTITION p2024q2 VALUES [("2024-04-01"), ("2024-07-01")),
+ PARTITION p2024q3 VALUES [("2024-07-01"), ("2024-10-01")),
+ PARTITION p2024q4 VALUES [("2024-10-01"), ("2025-01-01"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql "drop table if exists rf_prune_dim_date"
+ sql """
+ CREATE TABLE rf_prune_dim_date (
+ dim_dt DATE NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_dt) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql """INSERT INTO rf_prune_range_date VALUES
+ (1, '2024-01-15', 'jan'), (2, '2024-02-20', 'feb'), (3, '2024-03-10', 'mar'),
+ (4, '2024-04-05', 'apr'), (5, '2024-05-15', 'may'), (6, '2024-06-20', 'jun'),
+ (7, '2024-07-10', 'jul'), (8, '2024-08-15', 'aug'), (9, '2024-09-20', 'sep'),
+ (10, '2024-10-05', 'oct'), (11, '2024-11-15', 'nov'), (12, '2024-12-20', 'dec')
+ """
+
+ sql """INSERT INTO rf_prune_dim_date VALUES
+ ('2024-01-15', 'x'), ('2024-02-20', 'y'), ('2024-03-10', 'z')
+ """
+
+ // ============================================================
+ // Setup: List-partitioned fact table (VARCHAR partition column)
+ // ============================================================
+ sql "drop table if exists rf_prune_list_str"
+ sql """
+ CREATE TABLE rf_prune_list_str (
+ id INT NOT NULL,
+ city VARCHAR(32) NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY LIST(city) (
+ PARTITION p_bj VALUES IN ("Beijing"),
+ PARTITION p_sh VALUES IN ("Shanghai"),
+ PARTITION p_gz VALUES IN ("Guangzhou"),
+ PARTITION p_sz VALUES IN ("Shenzhen")
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql "drop table if exists rf_prune_dim_city"
+ sql """
+ CREATE TABLE rf_prune_dim_city (
+ dim_city VARCHAR(32) NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_city) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql """INSERT INTO rf_prune_list_str VALUES
+ (1, 'Beijing', 'a'), (2, 'Beijing', 'b'), (3, 'Beijing', 'c'),
+ (4, 'Shanghai', 'd'), (5, 'Shanghai', 'e'), (6, 'Shanghai', 'f'),
+ (7, 'Guangzhou', 'g'), (8, 'Guangzhou', 'h'), (9, 'Guangzhou', 'i'),
+ (10, 'Shenzhen', 'j'), (11, 'Shenzhen', 'k'), (12, 'Shenzhen', 'l')
+ """
+
+ sql """INSERT INTO rf_prune_dim_city VALUES ('Beijing', 'x')"""
+
+ // ============================================================
+ // Setup: List-partitioned fact table (INT partition column)
+ // ============================================================
+ sql "drop table if exists rf_prune_list_int"
+ sql """
+ CREATE TABLE rf_prune_list_int (
+ id INT NOT NULL,
+ region_id INT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY LIST(region_id) (
+ PARTITION p_r1 VALUES IN ("1"),
+ PARTITION p_r2 VALUES IN ("2"),
+ PARTITION p_r3 VALUES IN ("3"),
+ PARTITION p_r4 VALUES IN ("4"),
+ PARTITION p_r5 VALUES IN ("5")
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql "drop table if exists rf_prune_dim_region"
+ sql """
+ CREATE TABLE rf_prune_dim_region (
+ dim_region INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_region) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+
+ sql """INSERT INTO rf_prune_list_int VALUES
+ (1, 1, 'a'), (2, 1, 'b'),
+ (3, 2, 'c'), (4, 2, 'd'),
+ (5, 3, 'e'), (6, 3, 'f'),
+ (7, 4, 'g'), (8, 4, 'h'),
+ (9, 5, 'i'), (10, 5, 'j')
+ """
+
+ sql """INSERT INTO rf_prune_dim_region VALUES (1, 'x'), (3, 'y')"""
+
+ // ============================================================
+ // Session settings
+ // ============================================================
+ sql "set runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX'"
+ sql "set runtime_filter_wait_infinitely=true"
+ sql "set disable_join_reorder=true"
+ sql "set enable_profile=true"
+ sql "set profile_level=2"
+ // Fix parallelism to 1 so profile counters are deterministic across environments
+ sql "set parallel_pipeline_task_num=1"
+
+ // ============================================================
+ // Test 1: Range partition (INT) - IN filter prune
+ // RF {10, 50, 90} → only p1 [0,100) matches → prune 3 of 4
+ // ============================================================
+ order_qt_range_int_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_int d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // Test 2: Range partition (INT) - MinMax filter prune
+ // min=10, max=90 → only p1 matches → prune 3 of 4
+ order_qt_range_int_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_int d ON f.part_col = d.dim_key",
+ "MIN_MAX", 4, 3)
+
+ // Test 3: Range partition (DATE) - IN filter prune
+ // Q1 dates only → prune Q2, Q3, Q4 → prune 3 of 4
+ order_qt_range_date_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.dt, f.value
+ FROM rf_prune_range_date f
+ JOIN rf_prune_dim_date d ON f.dt = d.dim_dt
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_date f JOIN rf_prune_dim_date d ON f.dt = d.dim_dt",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // Test 4: Range partition (DATE) - MinMax filter prune
+ order_qt_range_date_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.dt, f.value
+ FROM rf_prune_range_date f
+ JOIN rf_prune_dim_date d ON f.dt = d.dim_dt
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_date f JOIN rf_prune_dim_date d ON f.dt = d.dim_dt",
+ "MIN_MAX", 4, 3)
+
+ // Test 5: List partition (VARCHAR) - IN filter prune
+ // Only "Beijing" → prune 3 of 4
+ order_qt_list_str_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.city, f.value
+ FROM rf_prune_list_str f
+ JOIN rf_prune_dim_city d ON f.city = d.dim_city
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_str f JOIN rf_prune_dim_city d ON f.city = d.dim_city",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // Test 6: List partition (INT) - IN filter prune
+ // Regions {1, 3} → prune 3 of 5
+ order_qt_list_int_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.region_id, f.value
+ FROM rf_prune_list_int f
+ JOIN rf_prune_dim_region d ON f.region_id = d.dim_region
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_int f JOIN rf_prune_dim_region d ON f.region_id = d.dim_region",
+ "IN_OR_BLOOM_FILTER", 5, 3)
+
+ // Test 7: No pruning - dim matches all partitions
+ sql "drop table if exists rf_prune_dim_all"
+ sql """
+ CREATE TABLE rf_prune_dim_all (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_all VALUES (50, 'a'), (150, 'b'), (250, 'c'), (350, 'd')"""
+
+ order_qt_no_pruning """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_all d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_all d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 0)
+
+ // Test 8: Join result correctness with aggregation
+ order_qt_range_int_agg """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ count(*), min(f.part_col), max(f.part_col)
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+ """
+
+ // Test 9: List partition with aggregation
+ order_qt_list_str_agg """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.city, count(*) as cnt
+ FROM rf_prune_list_str f
+ JOIN rf_prune_dim_city d ON f.city = d.dim_city
+ GROUP BY f.city
+ """
+
+ // Test 10: Multiple dim rows ({2, 4}) → prune 3 of 5
+ sql "drop table if exists rf_prune_dim_multi"
+ sql """
+ CREATE TABLE rf_prune_dim_multi (
+ dim_region INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_region) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_multi VALUES (2, 'a'), (4, 'b')"""
+
+ order_qt_list_int_multi """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.region_id, f.value
+ FROM rf_prune_list_int f
+ JOIN rf_prune_dim_multi d ON f.region_id = d.dim_region
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_int f JOIN rf_prune_dim_multi d ON f.region_id = d.dim_region",
+ "IN_OR_BLOOM_FILTER", 5, 3)
+
+ // ============================================================
+ // Extended tests: more types, more partitions, non-monotonic,
+ // negative ranges, boundary values
+ // ============================================================
+
+ // ---- Setup: BIGINT range, 20 partitions (0–1,000,000 step 50,000) ----
+ sql "drop table if exists rf_prune_range_bigint"
+ def bigintPartitions = (0..19).collect { i ->
+ def lo = i * 50000L
+ def hi = (i + 1) * 50000L
+ "PARTITION p${i} VALUES [(\"${lo}\"), (\"${hi}\"))"
+ }.join(",\n ")
+ sql """
+ CREATE TABLE rf_prune_range_bigint (
+ id INT NOT NULL,
+ bval BIGINT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(bval) (
+ ${bigintPartitions}
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+ // Insert 2 rows per partition
+ def bigintInsertVals = (0..19).collect { i ->
+ def v1 = i * 50000L + 100
+ def v2 = i * 50000L + 25000
+ "(${i * 2 + 1}, ${v1}, 'v${i}a'), (${i * 2 + 2}, ${v2}, 'v${i}b')"
+ }.join(",\n ")
+ sql "INSERT INTO rf_prune_range_bigint VALUES ${bigintInsertVals}"
+
+ // Dim: only values in p0 [0,50000) and p5 [250000,300000) → 18 pruned
+ sql "drop table if exists rf_prune_dim_bigint"
+ sql """
+ CREATE TABLE rf_prune_dim_bigint (
+ dim_key BIGINT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_bigint VALUES (100, 'x'), (25000, 'y'), (250100, 'z'), (275000, 'w')"""
+
+ // Test 11: BIGINT range (20 partitions) + IN → 18 pruned
+ order_qt_bigint_range_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.bval, f.value
+ FROM rf_prune_range_bigint f
+ JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_bigint f JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 20, 18)
+
+ // Test 12: BIGINT range (20 partitions) + MinMax → 18 pruned
+ // min=100, max=275000 → covers p0–p5 (6 partitions), so 14 pruned
+ order_qt_bigint_range_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.bval, f.value
+ FROM rf_prune_range_bigint f
+ JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_bigint f JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key",
+ "MIN_MAX", 20, 14)
+
+ // ---- Setup: DATETIME range, 8 partitions (quarterly, 2024-2025) ----
+ sql "drop table if exists rf_prune_range_datetime"
+ sql """
+ CREATE TABLE rf_prune_range_datetime (
+ id INT NOT NULL,
+ ts DATETIME NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(ts) (
+ PARTITION p2024q1 VALUES [("2024-01-01 00:00:00"), ("2024-04-01 00:00:00")),
+ PARTITION p2024q2 VALUES [("2024-04-01 00:00:00"), ("2024-07-01 00:00:00")),
+ PARTITION p2024q3 VALUES [("2024-07-01 00:00:00"), ("2024-10-01 00:00:00")),
+ PARTITION p2024q4 VALUES [("2024-10-01 00:00:00"), ("2025-01-01 00:00:00")),
+ PARTITION p2025q1 VALUES [("2025-01-01 00:00:00"), ("2025-04-01 00:00:00")),
+ PARTITION p2025q2 VALUES [("2025-04-01 00:00:00"), ("2025-07-01 00:00:00")),
+ PARTITION p2025q3 VALUES [("2025-07-01 00:00:00"), ("2025-10-01 00:00:00")),
+ PARTITION p2025q4 VALUES [("2025-10-01 00:00:00"), ("2026-01-01 00:00:00"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_range_datetime VALUES
+ (1, '2024-02-15 10:00:00', 'jan'), (2, '2024-05-20 12:00:00', 'may'),
+ (3, '2024-08-10 08:00:00', 'aug'), (4, '2024-11-25 14:00:00', 'nov'),
+ (5, '2025-02-01 09:00:00', 'feb25'), (6, '2025-05-15 11:00:00', 'may25'),
+ (7, '2025-08-20 16:00:00', 'aug25'), (8, '2025-11-10 13:00:00', 'nov25')
+ """
+
+ sql "drop table if exists rf_prune_dim_datetime"
+ sql """
+ CREATE TABLE rf_prune_dim_datetime (
+ dim_ts DATETIME NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_ts) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ // Only 2024 Q1 timestamps → 7 of 8 partitions pruned
+ sql """INSERT INTO rf_prune_dim_datetime VALUES
+ ('2024-02-15 10:00:00', 'x'), ('2024-03-01 00:00:00', 'y')
+ """
+
+ // Test 13: DATETIME range (8 partitions) + IN → 7 pruned
+ order_qt_datetime_range_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.ts, f.value
+ FROM rf_prune_range_datetime f
+ JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_datetime f JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts",
+ "IN_OR_BLOOM_FILTER", 8, 7)
+
+ // Test 14: DATETIME range (8 partitions) + MinMax → 7 pruned
+ order_qt_datetime_range_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.ts, f.value
+ FROM rf_prune_range_datetime f
+ JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_datetime f JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts",
+ "MIN_MAX", 8, 7)
+
+ // ---- Setup: 50 INT list partitions ----
+ sql "drop table if exists rf_prune_list_50"
+ def list50Partitions = (1..50).collect { i ->
+ "PARTITION p_${i} VALUES IN (\"${i}\")"
+ }.join(",\n ")
+ sql """
+ CREATE TABLE rf_prune_list_50 (
+ id INT NOT NULL,
+ cat_id INT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY LIST(cat_id) (
+ ${list50Partitions}
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+ def list50InsertVals = (1..50).collect { i ->
+ "(${i}, ${i}, 'cat${i}')"
+ }.join(", ")
+ sql "INSERT INTO rf_prune_list_50 VALUES ${list50InsertVals}"
+
+ sql "drop table if exists rf_prune_dim_50"
+ sql """
+ CREATE TABLE rf_prune_dim_50 (
+ dim_cat INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_cat) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ // Match 5 of 50 → 45 pruned
+ sql """INSERT INTO rf_prune_dim_50 VALUES (3, 'a'), (10, 'b'), (25, 'c'), (40, 'd'), (50, 'e')"""
+
+ // Test 15: 50 list partitions + IN → 45 pruned
+ order_qt_list_50_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.cat_id, f.value
+ FROM rf_prune_list_50 f
+ JOIN rf_prune_dim_50 d ON f.cat_id = d.dim_cat
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_50 f JOIN rf_prune_dim_50 d ON f.cat_id = d.dim_cat",
+ "IN_OR_BLOOM_FILTER", 50, 45)
+
+ // ---- Non-monotonic expression tests ----
+ // Test 16: Join on abs(part_col) — target expr is NOT a SlotRef → 0 pruned
+ // Using existing rf_prune_range_int (4 partitions), dim has value 50
+ sql "drop table if exists rf_prune_dim_abs"
+ sql """
+ CREATE TABLE rf_prune_dim_abs (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_abs VALUES (50, 'x')"""
+
+ def token16 = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token16}", f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_abs d ON abs(f.part_col) = d.dim_key
+ """
+ def profile16 = getProfileByToken(token16)
+ def pruned16 = extractCounterSum(profile16, "PartitionsPrunedByRuntimeFilter")
+ logger.info("non_monotonic abs: pruned=${pruned16}")
+ assertTrue(pruned16 == 0, "Non-monotonic expr should not prune, got ${pruned16}")
+
+ // Test 17: Join on f.part_col % 100 — non-monotonic → 0 pruned
+ sql "drop table if exists rf_prune_dim_mod"
+ sql """
+ CREATE TABLE rf_prune_dim_mod (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_mod VALUES (10, 'x')"""
+
+ def token17 = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token17}", f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_mod d ON (f.part_col % 100) = d.dim_key
+ """
+ def profile17 = getProfileByToken(token17)
+ def pruned17 = extractCounterSum(profile17, "PartitionsPrunedByRuntimeFilter")
+ logger.info("non_monotonic mod: pruned=${pruned17}")
+ assertTrue(pruned17 == 0, "Non-monotonic mod expr should not prune, got ${pruned17}")
+
+ // Test 18: Join on non-partition column (f.id) → 0 partition pruning
+ sql "drop table if exists rf_prune_dim_nonpart"
+ sql """
+ CREATE TABLE rf_prune_dim_nonpart (
+ dim_id INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_id) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_nonpart VALUES (1, 'x'), (3, 'y')"""
+
+ def token18 = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token18}", f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_nonpart d ON f.id = d.dim_id
+ """
+ def profile18 = getProfileByToken(token18)
+ def pruned18 = extractCounterSum(profile18, "PartitionsPrunedByRuntimeFilter")
+ logger.info("non_partition_col: pruned=${pruned18}")
+ assertTrue(pruned18 == 0, "Join on non-partition col should not prune, got ${pruned18}")
+
+ // ---- Negative value range partitions ----
+ sql "drop table if exists rf_prune_range_neg"
+ sql """
+ CREATE TABLE rf_prune_range_neg (
+ id INT NOT NULL,
+ neg_col INT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(neg_col) (
+ PARTITION pn4 VALUES [("-400"), ("-300")),
+ PARTITION pn3 VALUES [("-300"), ("-200")),
+ PARTITION pn2 VALUES [("-200"), ("-100")),
+ PARTITION pn1 VALUES [("-100"), ("0"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_range_neg VALUES
+ (1, -350, 'a'), (2, -310, 'b'),
+ (3, -250, 'c'), (4, -210, 'd'),
+ (5, -150, 'e'), (6, -110, 'f'),
+ (7, -50, 'g'), (8, -10, 'h')
+ """
+
+ sql "drop table if exists rf_prune_dim_neg"
+ sql """
+ CREATE TABLE rf_prune_dim_neg (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ // Only values in pn1 [-100, 0) → prune 3 of 4
+ sql """INSERT INTO rf_prune_dim_neg VALUES (-50, 'x'), (-10, 'y')"""
+
+ // Test 19: Negative range + IN → 3 pruned
+ order_qt_neg_range_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.neg_col, f.value
+ FROM rf_prune_range_neg f
+ JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_neg f JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // Test 20: Negative range + MinMax → 3 pruned
+ order_qt_neg_range_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.neg_col, f.value
+ FROM rf_prune_range_neg f
+ JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_neg f JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key",
+ "MIN_MAX", 4, 3)
+
+ // ---- Boundary value tests ----
+ // Dim has exact partition boundary values: 100, 200, 300
+ // 100 ∈ p2 [100,200), 200 ∈ p3 [200,300), 300 ∈ p4 [300,400)
+ // Only p1 [0,100) is pruned → 1 pruned
+ sql "drop table if exists rf_prune_dim_boundary"
+ sql """
+ CREATE TABLE rf_prune_dim_boundary (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_boundary VALUES (100, 'a'), (200, 'b'), (300, 'c')"""
+
+ def token21 = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token21}", f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_boundary d ON f.part_col = d.dim_key
+ """
+ def profile21 = getProfileByToken(token21)
+ def pruned21 = extractCounterSum(profile21, "PartitionsPrunedByRuntimeFilter")
+ def total21 = extractCounterSum(profile21, "TotalPartitionsForRFPruning")
+ logger.info("boundary_exact: total=${total21}, pruned=${pruned21}")
+ // p1 is pruned (no value 0-99 in dim), p2/p3/p4 match boundary values
+ assertTrue(pruned21 >= 1, "Boundary test: expected >= 1 pruned, got ${pruned21}")
+
+ // Test 22: Dim values outside all partitions → all pruned, empty result
+ sql "drop table if exists rf_prune_dim_outside"
+ sql """
+ CREATE TABLE rf_prune_dim_outside (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_outside VALUES (500, 'x'), (600, 'y'), (700, 'z')"""
+
+ def token22 = UUID.randomUUID().toString()
+ def result22 = sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token22}", f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_outside d ON f.part_col = d.dim_key
+ """
+ assertTrue(result22.isEmpty(), "Dim outside all partitions should return empty result")
+ def profile22 = getProfileByToken(token22)
+ def pruned22 = extractCounterSum(profile22, "PartitionsPrunedByRuntimeFilter")
+ logger.info("outside_all: pruned=${pruned22}")
+ // All 4 partitions should be pruned since RF IN {500,600,700} intersects none
+ assertTrue(pruned22 >= 4, "All partitions should be pruned, got ${pruned22}")
+
+ // ---- Descending dim value order test ----
+ // Verify that dim insertion order (desc vs asc) doesn't affect pruning
+ sql "drop table if exists rf_prune_dim_desc"
+ sql """
+ CREATE TABLE rf_prune_dim_desc (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ // Same values as rf_prune_dim_int {10, 50, 90} but inserted in descending order
+ sql """INSERT INTO rf_prune_dim_desc VALUES (90, 'z'), (50, 'y'), (10, 'x')"""
+
+ // Test 23: Descending dim order + IN → should still prune 3 of 4
+ order_qt_desc_dim_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // Test 24: Descending dim order + MinMax → should still prune 3 of 4
+ order_qt_desc_dim_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key",
+ "MIN_MAX", 4, 3)
+
+ // ---- Combined RF types test ----
+ // Test 25: Both IN and MinMax filters simultaneously
+ order_qt_combined_rf """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_int d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER,MIN_MAX", 4, 3)
+
+ // ---- Multi-partition match with MinMax spanning multiple partitions ----
+ // Dim values span p1 and p3: {50, 250} → MinMax [50, 250] covers p1, p2, p3
+ // IN filter prunes p2, p4 (2 pruned). MinMax prunes only p4 (1 pruned).
+ sql "drop table if exists rf_prune_dim_span"
+ sql """
+ CREATE TABLE rf_prune_dim_span (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_span VALUES (50, 'x'), (250, 'y')"""
+
+ // Test 26: Spanning dim + IN → 2 pruned (p2, p4 don't contain 50 or 250)
+ order_qt_span_in """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_span d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_span d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 2)
+
+ // Test 27: Spanning dim + MinMax → [50,250] covers p1,p2,p3; only p4 pruned → 1 pruned
+ order_qt_span_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_span d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_int f JOIN rf_prune_dim_span d ON f.part_col = d.dim_key",
+ "MIN_MAX", 4, 1)
+
+ // ============================================================
+ // Tests 28-32: LIST partition with NULL key (only_null & mixed)
+ // ============================================================
+ sql "drop table if exists rf_prune_list_null"
+ sql """
+ CREATE TABLE rf_prune_list_null (
+ id INT NOT NULL,
+ part_col INT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY LIST(part_col) (
+ PARTITION p_null VALUES IN ((NULL)),
+ PARTITION p_one VALUES IN ("1"),
+ PARTITION p_two VALUES IN ("2"),
+ PARTITION p_three VALUES IN ("3")
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_list_null VALUES
+ (1, NULL, 'a'), (2, NULL, 'b'),
+ (3, 1, 'c'), (4, 1, 'd'),
+ (5, 2, 'e'),
+ (6, 3, 'f')"""
+
+ sql "drop table if exists rf_prune_dim_one"
+ sql """
+ CREATE TABLE rf_prune_dim_one (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_one VALUES (1, 'x')"""
+
+ // Test 28: Non-null-aware RF {1} on LIST{p_null,p_one,p_two,p_three}
+ // p_null is only_null → pruned (RF !contain NULL)
+ // p_two, p_three → pruned (no value match)
+ // p_one → kept
+ order_qt_list_only_null_pruned """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_list_null f
+ JOIN rf_prune_dim_one d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_null f JOIN rf_prune_dim_one d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // Test 29: Same with MIN_MAX → MinMax [1,1] excludes p_null/p_two/p_three → 3 pruned
+ order_qt_list_only_null_minmax """
+ SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_list_null f
+ JOIN rf_prune_dim_one d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_null f JOIN rf_prune_dim_one d ON f.part_col = d.dim_key",
+ "MIN_MAX", 4, 3)
+
+ // Test 30: Mixed NULL+value partition. Build a list where one partition holds {NULL, 5}.
+ sql "drop table if exists rf_prune_list_mixed"
+ sql """
+ CREATE TABLE rf_prune_list_mixed (
+ id INT NOT NULL,
+ part_col INT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY LIST(part_col) (
+ PARTITION p_a VALUES IN ((NULL), ("5")),
+ PARTITION p_b VALUES IN (("10")),
+ PARTITION p_c VALUES IN (("20"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_list_mixed VALUES
+ (1, NULL, 'a'), (2, 5, 'b'),
+ (3, 10, 'c'),
+ (4, 20, 'd')"""
+
+ sql "drop table if exists rf_prune_dim_five"
+ sql """
+ CREATE TABLE rf_prune_dim_five (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_five VALUES (5, 'x')"""
+
+ // Test 30: Mixed partition {NULL,5} + RF {5} (non-null-aware) → p_a kept, p_b/p_c pruned
+ // Crucial regression: validates that concrete value 5 survives the NULL marker.
+ order_qt_list_mixed_value_kept """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col, f.value
+ FROM rf_prune_list_mixed f
+ JOIN rf_prune_dim_five d ON f.part_col = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_list_mixed f JOIN rf_prune_dim_five d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 3, 2)
+
+ // Test 31: Mixed partition {NULL,5} + RF {7} (no value match, RF non-null-aware)
+ // p_a still pruned (NULL row can't match non-null RF; concrete 5 != 7)
+ // p_b, p_c pruned
+ sql "drop table if exists rf_prune_dim_seven"
+ sql """
+ CREATE TABLE rf_prune_dim_seven (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_seven VALUES (7, 'x')"""
+
+ def token_mixed_all = UUID.randomUUID().toString()
+ def res_mixed_all = sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_mixed_all}", f.id
+ FROM rf_prune_list_mixed f
+ JOIN rf_prune_dim_seven d ON f.part_col = d.dim_key
+ """
+ assertTrue(res_mixed_all.isEmpty(), "RF {7} matches no row, expect empty")
+ assertPruningProfile(
+ "* FROM rf_prune_list_mixed f JOIN rf_prune_dim_seven d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 3, 3)
+
+ // Test 32: Null-safe equal join (<=>) on mixed partition. RF is null_aware AND
+ // contains NULL (build side has NULL key), so p_a (which contains NULL) MUST
+ // be kept even if RF concrete values miss 5.
+ sql "drop table if exists rf_prune_dim_null"
+ sql """
+ CREATE TABLE rf_prune_dim_null (
+ dim_key INT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_val) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_null VALUES (NULL, 'n')"""
+
+ order_qt_list_mixed_nullsafe """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.part_col
+ FROM rf_prune_list_mixed f
+ JOIN rf_prune_dim_null d ON f.part_col <=> d.dim_key
+ """
+
+ // ============================================================
+ // Tests 33-34: Multi-column RANGE projection (closed [L1, U1])
+ // ============================================================
+ // Critical: partition [(1,1),(1,5)) only contains rows whose first key == 1.
+ // Naively projecting to [1,1) (half-open) would mark it as empty and wrongly
+ // prune it when RF {1} probes. The fix uses closed [1,1].
+ sql "drop table if exists rf_prune_range_multi"
+ sql """
+ CREATE TABLE rf_prune_range_multi (
+ id INT NOT NULL,
+ k1 INT NOT NULL,
+ k2 INT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(k1, k2) (
+ PARTITION p_a VALUES [("1", "1"), ("1", "5")),
+ PARTITION p_b VALUES [("1", "5"), ("2", "0")),
+ PARTITION p_c VALUES [("2", "0"), ("3", "0"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_range_multi VALUES
+ (1, 1, 1, 'a'), (2, 1, 4, 'b'),
+ (3, 1, 5, 'c'), (4, 1, 9, 'd'),
+ (5, 2, 0, 'e'), (6, 2, 9, 'f')"""
+
+ // Test 33: RF {1} probing k1 → must keep p_a AND p_b (both span first-col=1),
+ // prune p_c. Validates closed-upper-bound projection.
+ order_qt_range_multi_first_col """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ f.id, f.k1, f.k2, f.value
+ FROM rf_prune_range_multi f
+ JOIN rf_prune_dim_one d ON f.k1 = d.dim_key
+ """
+ assertPruningProfile(
+ "* FROM rf_prune_range_multi f JOIN rf_prune_dim_one d ON f.k1 = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 3, 1)
+
+ // Test 34: MIN_MAX RF {1} → MinMax [1,1] still hits both p_a and p_b, prune p_c
+ assertPruningProfile(
+ "* FROM rf_prune_range_multi f JOIN rf_prune_dim_one d ON f.k1 = d.dim_key",
+ "MIN_MAX", 3, 1)
+
+ // ============================================================
+ // Tests 35-41: Type coverage
+ // ============================================================
+ // Each test creates a 4-partition RANGE table over a different type and
+ // joins with a 1-row dim that should match exactly one partition.
+ def runTypeCoverage = { String suffix, String partType, String dimType,
+ String partLits, String dimVal, List<String> rowVals ->
+ sql "drop table if exists rf_prune_type_${suffix}"
+ sql """
+ CREATE TABLE rf_prune_type_${suffix} (
+ id INT NOT NULL,
+ part_col ${partType} NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(part_col) (
+ PARTITION p1 VALUES [(${partLits.split('\\|')[0]}), (${partLits.split('\\|')[1]})),
+ PARTITION p2 VALUES [(${partLits.split('\\|')[1]}), (${partLits.split('\\|')[2]})),
+ PARTITION p3 VALUES [(${partLits.split('\\|')[2]}), (${partLits.split('\\|')[3]})),
+ PARTITION p4 VALUES [(${partLits.split('\\|')[3]}), (${partLits.split('\\|')[4]}))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES("replication_num" = "1")
+ """
+ def insertRows = rowVals.withIndex().collect { v, i -> "(${i + 1}, ${v}, 'r${i}')" }.join(", ")
+ sql "INSERT INTO rf_prune_type_${suffix} VALUES ${insertRows}"
+ sql "drop table if exists rf_prune_dim_type_${suffix}"
+ sql """
+ CREATE TABLE rf_prune_dim_type_${suffix} (
+ dim_key ${dimType} NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_val) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql "INSERT INTO rf_prune_dim_type_${suffix} VALUES (${dimVal}, 'x')"
+ assertPruningProfile(
+ "* FROM rf_prune_type_${suffix} f JOIN rf_prune_dim_type_${suffix} d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+ }
+
+ // Test 35: TINYINT
+ runTypeCoverage("tinyint", "TINYINT", "TINYINT",
+ '"-100"|"-50"|"0"|"50"|"100"', "-75",
+ ["-90", "-60", "-25", "10", "25", "75"])
+
+ // Test 36: SMALLINT
+ runTypeCoverage("smallint", "SMALLINT", "SMALLINT",
+ '"0"|"1000"|"2000"|"3000"|"4000"', "1500",
+ ["100", "1500", "2500", "3500"])
+
+ // Test 37: LARGEINT
+ runTypeCoverage("largeint", "LARGEINT", "LARGEINT",
+ '"0"|"100000000000"|"200000000000"|"300000000000"|"400000000000"',
+ "150000000000",
+ ["50000000000", "150000000000", "250000000000", "350000000000"])
+
+ // Test 38: DECIMAL is intentionally skipped — Doris does not allow
+ // DECIMAL columns as RANGE partition keys, and the rest of this suite
+ // already exercises decimal-typed RF targets via casts.
+
+ // Test 39: DATE
+ runTypeCoverage("date", "DATE", "DATE",
+ '"2023-01-01"|"2023-04-01"|"2023-07-01"|"2023-10-01"|"2024-01-01"',
+ '"2023-05-15"',
+ ['"2023-02-15"', '"2023-05-15"', '"2023-08-15"', '"2023-11-15"'])
+
+ // Test 40: DATETIME
+ runTypeCoverage("datetime", "DATETIME", "DATETIME",
+ '"2023-01-01 00:00:00"|"2023-04-01 00:00:00"|"2023-07-01 00:00:00"|"2023-10-01 00:00:00"|"2024-01-01 00:00:00"',
+ '"2023-05-15 12:00:00"',
+ ['"2023-02-15 00:00:00"', '"2023-05-15 12:00:00"', '"2023-08-15 00:00:00"', '"2023-11-15 00:00:00"'])
+
+ // Test 41: VARCHAR is intentionally skipped — Doris does not allow
+ // VARCHAR columns as RANGE partition keys.
+
+ // ============================================================
+ // Tests 42-47: Monotonic / non-monotonic target_expr pruning
+ //
+ // FE classifier is currently conservative: only an identity SlotRef on a
+ // partition column is treated as MONOTONIC_INCREASING; any wrapping
+ // expression (including Cast and other Nereids `Monotonic` functions) is
+ // classified NON_MONOTONIC and therefore does not drive partition pruning.
+ // This will be revisited once the `Monotonic.isMonotonic(lower, upper)`
+ // implementations are completed/audited so we can safely walk through
+ // monotonic chains. Until then these tests pin the conservative behavior.
+ //
+ // NOTE: Nereids' RuntimeFilterPushDownVisitor itself only allows
+ // non-trivial target expressions when the input is either (a) a numeric
+ // single-slot expression, or (b) a chain of Cast wrapping a single slot.
+ // Date functions like year(dt)/date_trunc(dt,...) on a DATE partition
+ // column do not currently produce a runtime filter at all, so they cannot
+ // be exercised here regardless of the classifier choice.
+ // ============================================================
+
+ sql "drop table if exists rf_prune_dim_bigint"
+ sql """
+ CREATE TABLE rf_prune_dim_bigint (
+ dim_key BIGINT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_bigint VALUES (50, 'x')"""
+
+ // Test 42: target_expr = cast(part_col as bigint). Cast is monotonic in
+ // theory but the conservative FE classifier treats any non-Slot target
+ // expression as NON_MONOTONIC, so partition pruning does not fire.
+ def token_cast = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_cast}", f.id
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_bigint d ON cast(f.part_col as bigint) = d.dim_key
+ """
+ def profile_cast = getProfileByToken(token_cast)
+ def pruned_cast = extractCounterSum(profile_cast, "PartitionsPrunedByRuntimeFilter")
+ logger.info("conservative cast: pruned=${pruned_cast}")
+ assertTrue(pruned_cast == 0,
+ "Conservative classifier: Cast wrapper should not drive pruning, got ${pruned_cast}")
+
+ // Test 43: chained Cast — same reasoning as Test 42, expect no pruning.
+ def token_cast2 = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_cast2}", f.id
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_int d ON cast(cast(f.part_col as bigint) as int) = d.dim_key
+ """
+ def profile_cast2 = getProfileByToken(token_cast2)
+ def pruned_cast2 = extractCounterSum(profile_cast2, "PartitionsPrunedByRuntimeFilter")
+ logger.info("conservative chained cast: pruned=${pruned_cast2}")
+ assertTrue(pruned_cast2 == 0,
+ "Conservative classifier: chained Cast should not drive pruning, got ${pruned_cast2}")
+
+ // Test 44: target_expr = cast(dt as datetime) on a DATE partition column.
+ // Cast on DATE→DATETIME is non-trivial and the RF generator emits the
+ // filter, but the conservative classifier still rejects the wrapper.
+ sql "drop table if exists rf_prune_dim_dt"
+ sql """
+ CREATE TABLE rf_prune_dim_dt (
+ dim_dt DATETIME NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_dt) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_dt VALUES ('2024-02-20 00:00:00', 'x')"""
+ def token_castdt = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_castdt}", f.id
+ FROM rf_prune_range_date f
+ JOIN rf_prune_dim_dt d ON cast(f.dt as datetime) = d.dim_dt
+ """
+ def profile_castdt = getProfileByToken(token_castdt)
+ def pruned_castdt = extractCounterSum(profile_castdt, "PartitionsPrunedByRuntimeFilter")
+ logger.info("conservative cast date->datetime: pruned=${pruned_castdt}")
+ assertTrue(pruned_castdt == 0,
+ "Conservative classifier: Cast date->datetime should not drive pruning, got ${pruned_castdt}")
+
+ // Test 45: target_expr = -part_col (Negative). Numeric input slot lets
+ // the RF generator emit a filter, but Negative is not declared Monotonic
+ // in Nereids → classifier returns NON_MONOTONIC → no partition pruning.
+ sql "drop table if exists rf_prune_dim_neg2"
+ sql """
+ CREATE TABLE rf_prune_dim_neg2 (
+ dim_key INT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_neg2 VALUES (-50, 'x')"""
+ def token_neg = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_neg}", f.id
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_neg2 d ON (-f.part_col) = d.dim_key
+ """
+ def profile_neg = getProfileByToken(token_neg)
+ def pruned_neg = extractCounterSum(profile_neg, "PartitionsPrunedByRuntimeFilter")
+ logger.info("non_monotonic neg: pruned=${pruned_neg}")
+ assertTrue(pruned_neg == 0,
+ "Negative is not declared Monotonic in Nereids; should not prune, got ${pruned_neg}")
+
+ // Test 46: Test for non-monotonic Add was removed because Nereids
+ // constant-folds `part_col + 10 = 60` into `part_col = 50`, so the RF
+ // ends up identity and pruning happens. The Negative case in Test 45
+ // already covers the non-monotonic single-slot probe path.
+
+ // Test 47: Cast wrapping a non-partition column — cast(f.id as bigint).
+ // Cast is monotonic, but f.id is not a partition column of the table
+ // (rf_prune_range_int is partitioned on part_col). Classifier walks the
+ // Cast, lands on a non-partition slot, returns NON_MONOTONIC → no
+ // partition pruning even though the chain is monotonic.
+ sql "drop table if exists rf_prune_dim_id_bigint"
+ sql """
+ CREATE TABLE rf_prune_dim_id_bigint (
+ dim_key BIGINT NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_id_bigint VALUES (5, 'x')"""
+ def token_id = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_id}", f.id
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_id_bigint d ON cast(f.id as bigint) = d.dim_key
+ """
+ def profile_id = getProfileByToken(token_id)
+ def pruned_id = extractCounterSum(profile_id, "PartitionsPrunedByRuntimeFilter")
+ logger.info("monotonic chain on non-partition col: pruned=${pruned_id}")
+ assertTrue(pruned_id == 0,
+ "Monotonic chain on non-partition col should not prune, got ${pruned_id}")
+
+ // ============================================================
+ // Test 47b: String partition column (LIST partition on VARCHAR).
+ //
+ // Regression coverage for the BE pruner string path. ColumnValueRange
+ // for string types uses CppType=std::string while RF literals/HybridSet
+ // expose StringRef bytes; the pruner must construct std::string from
+ // those bytes (not reinterpret_cast). Without that fix this case would
+ // read garbage / crash in BE under ASAN. We exercise both IN_FILTER (via
+ // small build side) and BLOOM/MIN_MAX paths by forcing IN_OR_BLOOM_FILTER.
+ // ============================================================
+ sql "drop table if exists rf_prune_list_str"
+ sql """
+ CREATE TABLE rf_prune_list_str (
+ id INT,
+ part_col VARCHAR(16) NOT NULL
+ )
+ PARTITION BY LIST(part_col) (
+ PARTITION pa VALUES IN ('a','b'),
+ PARTITION pc VALUES IN ('c','d'),
+ PARTITION pe VALUES IN ('e','f'),
+ PARTITION pg VALUES IN ('g','h')
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_list_str VALUES
+ (1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e'),(6,'f'),(7,'g'),(8,'h')"""
+
+ sql "drop table if exists rf_prune_dim_str"
+ sql """
+ CREATE TABLE rf_prune_dim_str (
+ dim_key VARCHAR(16) NOT NULL,
+ dim_val VARCHAR(32)
+ )
+ DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_dim_str VALUES ('c', 'x')"""
+
+ // RF {'c'} should keep only partition pc and prune the other three.
+ assertPruningProfile(
+ "* FROM rf_prune_list_str f JOIN rf_prune_dim_str d ON f.part_col = d.dim_key",
+ "IN_OR_BLOOM_FILTER", 4, 3)
+
+ // ============================================================
+ // Test 48: Grouped RF with multiple targets.
+ //
+ // The RF generated from d.dim_key -> f1.part_col is expanded through the
+ // inner join f1.part_col = f2.part_col, so Nereids creates two RF objects
+ // with the same source/type/builder and different scan targets. The legacy
+ // translator groups them into one RuntimeFilter with two targets. Both
+ // targets must retain partition-pruning monotonicity.
+ // ============================================================
+ sql "drop table if exists rf_prune_range_int_copy"
+ sql """
+ CREATE TABLE rf_prune_range_int_copy (
+ id INT NOT NULL,
+ part_col INT NOT NULL,
+ value VARCHAR(64)
+ )
+ PARTITION BY RANGE(part_col) (
+ PARTITION p1 VALUES [("0"), ("100")),
+ PARTITION p2 VALUES [("100"), ("200")),
+ PARTITION p3 VALUES [("200"), ("300")),
+ PARTITION p4 VALUES [("300"), ("400"))
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 4
+ PROPERTIES("replication_num" = "1")
+ """
+ sql """INSERT INTO rf_prune_range_int_copy VALUES
+ (1, 10, 'a'), (2, 20, 'b'), (3, 50, 'c'), (4, 80, 'd'), (5, 90, 'e'),
+ (6, 110, 'f'), (7, 120, 'g'), (8, 150, 'h'), (9, 180, 'i'), (10, 190, 'j'),
+ (11, 210, 'k'), (12, 220, 'l'), (13, 250, 'm'), (14, 280, 'n'), (15, 290, 'o'),
+ (16, 310, 'p'), (17, 320, 'q'), (18, 350, 'r'), (19, 380, 's'), (20, 390, 't')
+ """
+ assertPruningProfile(
+ "count(*) FROM rf_prune_range_int f1 "
+ + "JOIN rf_prune_range_int_copy f2 ON f1.part_col = f2.part_col "
+ + "JOIN rf_prune_dim_int d ON d.dim_key = f1.part_col",
+ "IN_OR_BLOOM_FILTER", 8, 6)
+
+ // ============================================================
+ // Test 49: Switch off enable_runtime_filter_partition_prune → no pruning
+ // ============================================================
+ sql "set enable_runtime_filter_partition_prune=false"
+ def token_off = UUID.randomUUID().toString()
+ sql """
+ SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+ "${token_off}", f.id
+ FROM rf_prune_range_int f
+ JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+ """
+ def profile_off = getProfileByToken(token_off)
+ def total_off = extractCounterSum(profile_off, "TotalPartitionsForRFPruning")
+ def pruned_off = extractCounterSum(profile_off, "PartitionsPrunedByRuntimeFilter")
+ logger.info("switch_off: total=${total_off}, pruned=${pruned_off}")
+ assertTrue(total_off == 0,
+ "When switched off, no partitions should be considered (got total=${total_off})")
+ assertTrue(pruned_off == 0,
+ "When switched off, no partitions should be pruned (got pruned=${pruned_off})")
+ sql "set enable_runtime_filter_partition_prune=true"
+}