[feature](runtime-filter) Add runtime filter partition pruning

### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Add runtime filter partition pruning support and cover planner/runtime handling for partition boundaries. Preserve partition-pruning monotonicity for grouped runtime filters with multiple scan targets, and run partition pruning for runtime filters acquired before scanner construction.

### Release note

Add runtime filter partition pruning support.

### Check List (For Author)

- Test: BE build, FE build, BE format, and FE checkstyle
    - Manual test: ./build.sh --be
    - Manual test: ./build.sh --fe
    - Manual test: build-support/check-format.sh
    - Manual test: cd fe && mvn checkstyle:check -pl fe-core
    - Regression test: Added rf_partition_pruning coverage for grouped runtime filters; not run locally because the running cluster was not restarted with the patched build.
- Behavior changed: Yes (adds runtime filter partition pruning when enabled)
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
diff --git a/be/src/exec/operator/olap_scan_operator.cpp b/be/src/exec/operator/olap_scan_operator.cpp
index 3cde3c9..ac05648 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -22,6 +22,7 @@
 #include <memory>
 #include <numeric>
 #include <optional>
+#include <shared_mutex>
 
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_storage_engine.h"
@@ -33,10 +34,13 @@
 #include "exec/scan/olap_scanner.h"
 #include "exec/scan/parallel_scanner_builder.h"
 #include "exprs/function/in.h"
+#include "exprs/hybrid_set.h"
 #include "exprs/score_runtime.h"
 #include "exprs/vectorized_fn_call.h"
 #include "exprs/vexpr.h"
 #include "exprs/vexpr_context.h"
+#include "exprs/vliteral.h"
+#include "exprs/vruntimefilter_wrapper.h"
 #include "exprs/vslot_ref.h"
 #include "io/cache/block_file_cache_profile.h"
 #include "runtime/query_cache/query_cache.h"
@@ -91,6 +95,9 @@
 
     RETURN_IF_ERROR(Base::init(state, info));
     RETURN_IF_ERROR(_sync_cloud_tablets(state));
+
+    _attach_partition_boundaries();
+
     return Status::OK();
 }
 
@@ -121,6 +128,8 @@
     // Rows read from storage.
     // Include the rows read from doris page cache.
     _scan_rows = ADD_COUNTER(custom_profile(), "ScanRows", TUnit::UNIT);
+    _tablets_pruned_by_rf_counter =
+            ADD_COUNTER(custom_profile(), "TabletsPrunedByRuntimeFilter", TUnit::UNIT);
 
     // 1. init segment profile
     _segment_profile.reset(new RuntimeProfile("SegmentIterator"));
@@ -499,6 +508,50 @@
         _cond_ranges.emplace_back(new doris::OlapScanRange());
     }
 
+    // Filter out tablets whose partitions have been pruned by runtime filters.
+    //
+    // TODO(rf-partition-prune): this happens after OlapScanLocalState::init()
+    // has already executed _sync_cloud_tablets() (in cloud mode that performs
+    // get_tablet() and waits for sync_rowsets() on every scan range) and after
+    // capture_read_source() has been called for every tablet. RFs that are
+    // ready at start therefore still pay the full per-tablet metadata / read
+    // source setup cost for partitions that are immediately dropped here, so
+    // the current feature only saves scanner construction and scan IO, not
+    // the expensive setup work that partition pruning was originally intended
+    // to avoid. To fix this we need to (a) add partition_id onto
+    // TPaloScanRange so BE knows the partition without first materializing
+    // the tablet, (b) acquire ready-at-start RFs before _sync_cloud_tablets()
+    // and run partition pruning there to filter _scan_ranges by partition_id
+    // so the heavy per-tablet work is skipped for pruned partitions.
+    if (_rf_partition_pruner.pruned_partition_count() > 0) {
+        DCHECK_EQ(_tablets.size(), _scan_ranges.size());
+        DCHECK_EQ(_tablets.size(), _read_sources.size());
+        size_t write_idx = 0;
+        for (size_t read_idx = 0; read_idx < _tablets.size(); ++read_idx) {
+            int64_t pid = _tablets[read_idx].tablet->partition_id();
+            if (!_rf_partition_pruner.is_partition_pruned(pid)) {
+                if (write_idx != read_idx) {
+                    _tablets[write_idx] = std::move(_tablets[read_idx]);
+                    _scan_ranges[write_idx] = std::move(_scan_ranges[read_idx]);
+                    _read_sources[write_idx] = std::move(_read_sources[read_idx]);
+                }
+                ++write_idx;
+            }
+        }
+        if (write_idx < _tablets.size()) {
+            COUNTER_SET(_tablets_pruned_by_rf_counter,
+                        static_cast<int64_t>(_tablets.size() - write_idx));
+            _tablets.resize(write_idx);
+            _scan_ranges.resize(write_idx);
+            _read_sources.resize(write_idx);
+        }
+        if (_tablets.empty()) {
+            _eos = true;
+            _scan_dependency->set_ready();
+            return Status::OK();
+        }
+    }
+
     bool enable_parallel_scan = state()->enable_parallel_scan();
 
     // The flag of preagg's meaning is whether return pre agg data(or partial agg data)
@@ -1090,4 +1143,31 @@
     }
 }
 
+// ======== Runtime Filter Partition Pruning ========
+
+Status OlapScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ScanOperatorX<OlapScanLocalState>::prepare(state));
+    // Parse partition boundaries once per fragment-on-host. Cost (VLiteral
+    // construction + ColumnPtr materialization + ColumnValueRange build per
+    // boundary literal) is plan-time-static, so doing it here -- rather than
+    // in per-instance LocalState::init -- avoids paying it parallel_tasks
+    // times. The parsed result lives on the generic ScanOperatorX base and is
+    // read by every per-instance pruner via OperatorXBase::parsed_partition_boundaries().
+    if (state->query_options().enable_runtime_filter_partition_prune &&
+        _olap_scan_node.__isset.partition_boundaries &&
+        !_olap_scan_node.partition_boundaries.empty()) {
+        _parsed_partition_boundaries.parse(_olap_scan_node.partition_boundaries,
+                                           _slot_id_to_slot_desc);
+    }
+    return Status::OK();
+}
+
+void OlapScanLocalState::_attach_partition_boundaries() {
+    const auto* parsed = _parent->parsed_partition_boundaries();
+    if (parsed == nullptr || parsed->empty()) {
+        return;
+    }
+    COUNTER_SET(_total_partitions_rf_counter, parsed->total_partitions());
+}
+
 } // namespace doris
diff --git a/be/src/exec/operator/olap_scan_operator.h b/be/src/exec/operator/olap_scan_operator.h
index e86b3c6..e6f42eb 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -19,13 +19,16 @@
 
 #include <stdint.h>
 
+#include <shared_mutex>
 #include <string>
+#include <unordered_set>
 
 #include "cloud/cloud_tablet.h"
 #include "common/status.h"
 #include "exec/operator/operator.h"
 #include "exec/operator/scan_operator.h"
 #include "runtime/runtime_profile.h"
+#include "storage/olap_scan_common.h"
 #include "storage/tablet/tablet_reader.h"
 
 namespace doris {
@@ -320,6 +323,14 @@
     std::map<SlotId, size_t> _slot_id_to_index_in_block;
     // this map is needed for scanner opening.
     std::map<SlotId, DataTypePtr> _slot_id_to_col_type;
+
+    // ---- Runtime-filter partition pruning ----
+    // Attaches this per-instance pruner to the shared parse result owned by
+    // OlapScanOperatorX (parsed once in OperatorX::prepare()). Cheap: pointer
+    // assignment plus a counter set, no parsing work.
+    void _attach_partition_boundaries();
+
+    RuntimeProfile::Counter* _tablets_pruned_by_rf_counter = nullptr;
 };
 
 class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
@@ -328,6 +339,8 @@
                       const DescriptorTbl& descs, int parallel_tasks,
                       const TQueryCacheParam& cache_param);
 
+    Status prepare(RuntimeState* state) override;
+
     int get_column_id(const std::string& col_name) const override {
         if (!_tablet_schema) {
             return -1;
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index 0d500ca..3dc4b89 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -53,6 +53,7 @@
 class AsyncResultWriter;
 class ScoreRuntime;
 class AnnTopNRuntime;
+class ParsedPartitionBoundaries;
 } // namespace doris
 
 namespace doris {
@@ -829,6 +830,15 @@
     [[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
+
+    // Per-fragment shared partition-boundary parse result, used for
+    // runtime-filter partition pruning. Returns nullptr for operators that
+    // don't support this feature (default). Scan operators override to expose
+    // their parsed boundaries; the per-instance pruning state lives on the
+    // ScanLocalState. This sits on the generic OperatorXBase so non-templated
+    // ScanLocalStateBase methods can fetch it without down-casting `_parent`
+    // to a specific scan type.
+    virtual const ParsedPartitionBoundaries* parsed_partition_boundaries() const { return nullptr; }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
     [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; }
     bool is_blockable(RuntimeState* state) const override {
diff --git a/be/src/exec/operator/scan_operator.cpp b/be/src/exec/operator/scan_operator.cpp
index 5a731ae..a2b5178 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -74,6 +74,7 @@
                                                               int& arrived_rf_num) {
     // Lock needed because _conjuncts can be accessed concurrently by multiple scanner threads
     std::unique_lock lock(_conjuncts_lock);
+    size_t conjuncts_before = _conjuncts.size();
     RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state, _parent->row_descriptor(),
                                                                    arrived_rf_num, _conjuncts));
     if (state->enable_adjust_conjunct_order_by_cost()) {
@@ -81,6 +82,14 @@
             return a->execute_cost() < b->execute_cost();
         });
     };
+    // Only re-run partition pruning when try_append_late_arrival_runtime_filter
+    // actually appended new conjuncts. Otherwise this hook would re-scan all
+    // partition boundaries on every scheduler pass while there are still
+    // unapplied RFs (Scanner::_applied_rf_num is not advanced here), wasting
+    // CPU re-evaluating the same set of RFs against the same boundaries.
+    if (_conjuncts.size() > conjuncts_before) {
+        _on_runtime_filter_update();
+    }
     return Status::OK();
 }
 
@@ -94,6 +103,33 @@
     return Status::OK();
 }
 
+bool ScanLocalStateBase::is_partition_pruned(int64_t partition_id) const {
+    return _rf_partition_pruner.is_partition_pruned(partition_id);
+}
+
+void ScanLocalStateBase::_on_runtime_filter_update() {
+    const auto* parsed = _parent->parsed_partition_boundaries();
+    if (parsed != nullptr && !parsed->empty()) {
+        _do_partition_pruning_by_rf();
+    }
+}
+
+void ScanLocalStateBase::_do_partition_pruning_by_rf() {
+    if (!_state->query_options().enable_runtime_filter_partition_prune) {
+        return;
+    }
+    const auto* parsed = _parent->parsed_partition_boundaries();
+    if (parsed == nullptr || parsed->empty()) {
+        return;
+    }
+    int64_t newly_pruned = _rf_partition_pruner.prune_by_runtime_filters(
+            *parsed, _conjuncts, _parent->runtime_filter_descs(), _parent->node_id());
+    if (newly_pruned > 0) {
+        COUNTER_SET(_partitions_pruned_by_rf_counter,
+                    _rf_partition_pruner.pruned_partition_count());
+    }
+}
+
 int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
     // For select * from table limit 10; should just use one thread.
     if (should_run_serial()) {
@@ -190,7 +226,11 @@
         RETURN_IF_ERROR(
                 p._common_expr_ctxs_push_down[i]->clone(state, _common_expr_ctxs_push_down[i]));
     }
+    size_t conjuncts_before = _conjuncts.size();
     RETURN_IF_ERROR(_helper.acquire_runtime_filter(state, _conjuncts, p.row_descriptor()));
+    if (_conjuncts.size() > conjuncts_before) {
+        _on_runtime_filter_update();
+    }
 
     // Disable condition cache in topn filter valid. TODO:: Try to support the topn filter in condition cache
     if (state->query_options().condition_cache_digest && p._topn_filter_source_node_ids.empty()) {
@@ -1073,6 +1113,11 @@
     _condition_cache_filtered_rows_counter =
             ADD_COUNTER(_scanner_profile, "ConditionCacheFilteredRows", TUnit::UNIT);
 
+    _partitions_pruned_by_rf_counter =
+            ADD_COUNTER(custom_profile(), "PartitionsPrunedByRuntimeFilter", TUnit::UNIT);
+    _total_partitions_rf_counter =
+            ADD_COUNTER(custom_profile(), "TotalPartitionsForRFPruning", TUnit::UNIT);
+
     // Rows read from storage.
     // Include the rows read from doris page cache.
     _scan_rows = ADD_COUNTER_WITH_LEVEL(custom_profile(), profile::SCAN_ROWS, TUnit::UNIT, 1);
diff --git a/be/src/exec/operator/scan_operator.h b/be/src/exec/operator/scan_operator.h
index 2551f76..39ba630 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -28,6 +28,7 @@
 #include "exec/operator/operator.h"
 #include "exec/pipeline/dependency.h"
 #include "exec/runtime_filter/runtime_filter_consumer_helper.h"
+#include "exec/runtime_filter/runtime_filter_partition_pruner.h"
 #include "exec/scan/scan_node.h"
 #include "exec/scan/scanner_context.h"
 #include "exprs/function_filter.h"
@@ -84,6 +85,10 @@
     [[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
     [[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const;
 
+    // Thread-safe check whether a partition has been pruned by runtime filter.
+    // Callable from any scan type's scanner in scheduling threads.
+    bool is_partition_pruned(int64_t partition_id) const;
+
     [[nodiscard]] std::string get_name() { return _parent->get_name(); }
 
     uint64_t get_condition_cache_digest() const { return _condition_cache_digest; }
@@ -98,6 +103,13 @@
 
     virtual Status _init_profile() = 0;
 
+    // Hook for subclasses to react after new runtime filters are appended.
+    // Called inside update_late_arrival_runtime_filter() while _conjuncts_lock is held.
+    // Default implementation runs partition pruning on the newly appended RFs.
+    virtual void _on_runtime_filter_update();
+
+    void _do_partition_pruning_by_rf();
+
     std::atomic<bool> _opened {false};
 
     DependencySPtr _scan_dependency = nullptr;
@@ -133,6 +145,11 @@
     RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr;
     RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr;
 
+    // ---- Runtime-filter partition pruning (scan-agnostic) ----
+    RuntimeFilterPartitionPruner _rf_partition_pruner;
+    RuntimeProfile::Counter* _partitions_pruned_by_rf_counter = nullptr;
+    RuntimeProfile::Counter* _total_partitions_rf_counter = nullptr;
+
     // Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type.
     std::atomic<bool> _eos = false;
     int _max_pushdown_conditions_per_column = 1024;
@@ -279,7 +296,10 @@
     friend class Scanner;
 
     Status _init_profile() override;
-    virtual Status _process_conjuncts(RuntimeState* state) { return _normalize_conjuncts(state); }
+    virtual Status _process_conjuncts(RuntimeState* state) {
+        _do_partition_pruning_by_rf();
+        return _normalize_conjuncts(state);
+    }
     virtual bool _should_push_down_common_expr() { return false; }
 
     virtual bool _storage_no_merge() { return false; }
@@ -363,6 +383,16 @@
         return _runtime_filter_descs;
     }
 
+    // Expose this operator's per-fragment shared partition-boundary parse
+    // result to the non-templated ScanLocalStateBase so it can drive runtime
+    // filter partition pruning without down-casting to a specific scan type.
+    // Subclasses are expected to populate `_parsed_partition_boundaries` from
+    // their own partition-boundary thrift field inside their `prepare()`
+    // override before any LocalState observes the result.
+    const ParsedPartitionBoundaries* parsed_partition_boundaries() const override {
+        return &_parsed_partition_boundaries;
+    }
+
     [[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }
 
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
@@ -440,6 +470,12 @@
 
     std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
     std::shared_ptr<MemLimiter> _mem_limiter = nullptr;
+
+    // Shared parse result of partition boundaries for runtime-filter partition
+    // pruning. Lives here (rather than on the Olap-specific subclass) so any
+    // future scan type can populate it in its `prepare()` override and reuse
+    // the generic pruning machinery in ScanLocalStateBase.
+    ParsedPartitionBoundaries _parsed_partition_boundaries;
 };
 
 } // namespace doris
diff --git a/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
new file mode 100644
index 0000000..527a2a5e
--- /dev/null
+++ b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
@@ -0,0 +1,749 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/runtime_filter/runtime_filter_partition_pruner.h"
+
+#include <gen_cpp/PlanNodes_types.h>
+
+#include <optional>
+#include <utility>
+
+#include "core/block/block.h"
+#include "core/column/column.h"
+#include "core/column/column_nullable.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/field.h"
+#include "exprs/bloom_filter_func.h"
+#include "exprs/hybrid_set.h"
+#include "exprs/vexpr.h"
+#include "exprs/vexpr_context.h"
+#include "exprs/vliteral.h"
+#include "exprs/vruntimefilter_wrapper.h"
+#include "exprs/vslot_ref.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+// Complexity is inflated by macro expansion for each PrimitiveType case.
+void ParsedPartitionBoundaries::parse(
+        const std::vector<TPartitionBoundary>& boundaries,
+        const phmap::flat_hash_map<int, SlotDescriptor*>& slot_descs) {
+    for (const auto& tb : boundaries) {
+        if (!tb.__isset.partition_id || !tb.__isset.slot_id) {
+            continue;
+        }
+        SlotId slot_id = tb.slot_id;
+
+        auto slot_it = slot_descs.find(slot_id);
+        if (slot_it == slot_descs.end()) {
+            continue;
+        }
+        SlotDescriptor* slot = slot_it->second;
+        // Reuse the slot's pre-built DataType: walking through VLiteral here
+        // would cost a `DataTypeFactory::create_data_type(node)` heap allocation
+        // and a one-row `ColumnConst` allocation per boundary endpoint. With
+        // thousands of partitions that dominates BuildTasksTime.
+        const DataTypePtr& slot_type = slot->type();
+        PrimitiveType ptype = slot_type->get_primitive_type();
+        int precision = cast_set<int>(slot_type->get_precision());
+        int scale = cast_set<int>(slot_type->get_scale());
+        bool is_nullable = slot->is_nullable();
+
+        // Store slot data type for potential projection use
+        _slot_data_types[slot_id] = slot_type;
+
+        ParsedBoundary boundary;
+        boundary.partition_id = tb.partition_id;
+        boundary.slot_id = slot_id;
+        boundary.is_nullable = is_nullable;
+
+        bool parsed_ok = false;
+
+#define BUILD_BOUNDARY_CVR(NAME)                                                               \
+    case TYPE_##NAME: {                                                                        \
+        using CppType = typename PrimitiveTypeTraits<TYPE_##NAME>::CppType;                    \
+        bool is_list = tb.__isset.list_values && !tb.list_values.empty();                      \
+        bool is_range = tb.__isset.range_start || tb.__isset.range_end;                        \
+        if (!is_list && !is_range) break;                                                      \
+        ColumnValueRange<TYPE_##NAME> cvr(slot->col_name(), is_nullable, precision, scale);    \
+        /* Returns nullopt if `node` is a NULL literal; the caller then sets contain_null  */  \
+        /* on the CVR instead of trying to extract a typed value (which would dereference  */  \
+        /* a null data pointer for the non-string branch).                                 */  \
+        auto parse_texpr_node = [&](const TExprNode& node) -> std::optional<CppType> {         \
+            if (node.node_type == TExprNodeType::NULL_LITERAL) {                               \
+                return std::nullopt;                                                           \
+            }                                                                                  \
+            /* `Field` value is copied into the CVR by `add_fixed_value` /             */      \
+            /* `add_range` (both take CppType by const-ref / by value), so the         */      \
+            /* temporary `Field`'s lifetime ending at this expression's full-statement */      \
+            /* boundary is safe -- including for `String` payloads.                    */      \
+            Field field = slot_type->get_field(node);                                          \
+            return std::make_optional<CppType>(field.get<TYPE_##NAME>());                      \
+        };                                                                                     \
+        if (is_list) {                                                                         \
+            auto empty_cvr = ColumnValueRange<TYPE_##NAME>::create_empty_column_value_range(   \
+                    is_nullable, precision, scale);                                            \
+            bool list_has_null = false;                                                        \
+            bool list_has_value = false;                                                       \
+            for (const auto& node : tb.list_values) {                                          \
+                auto parsed = parse_texpr_node(node);                                          \
+                if (!parsed) {                                                                 \
+                    list_has_null = true;                                                      \
+                    continue;                                                                  \
+                }                                                                              \
+                static_cast<void>(empty_cvr.add_fixed_value(*parsed));                         \
+                list_has_value = true;                                                         \
+            }                                                                                  \
+            if (list_has_value) {                                                              \
+                cvr.intersection(empty_cvr);                                                   \
+            }                                                                                  \
+            if (list_has_null && is_nullable) {                                                \
+                /* Track NULL membership on ParsedBoundary; calling          */                \
+                /* cvr.set_contain_null(true) here would invoke              */                \
+                /* set_empty_value_range() and discard the concrete fixed    */                \
+                /* values we just inserted, turning {NULL, v} into a         */                \
+                /* NULL-only boundary.                                       */                \
+                boundary.contains_null = true;                                                 \
+                if (!list_has_value) {                                                         \
+                    boundary.only_null = true;                                                 \
+                }                                                                              \
+            }                                                                                  \
+        } else {                                                                               \
+            if (tb.__isset.range_start) {                                                      \
+                auto parsed = parse_texpr_node(tb.range_start);                                \
+                if (parsed) {                                                                  \
+                    static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, *parsed));         \
+                }                                                                              \
+            }                                                                                  \
+            if (tb.__isset.range_end) {                                                        \
+                auto parsed = parse_texpr_node(tb.range_end);                                  \
+                if (parsed) {                                                                  \
+                    /* Multi-column RANGE projection emits a CLOSED upper bound (see       */  \
+                    /* TPartitionBoundary.range_end_inclusive comment); single-column RANGE */ \
+                    /* keeps the natural OPEN upper bound matching Doris semantics.         */ \
+                    SQLFilterOp upper_op =                                                     \
+                            (tb.__isset.range_end_inclusive && tb.range_end_inclusive)         \
+                                    ? FILTER_LESS_OR_EQUAL                                     \
+                                    : FILTER_LESS;                                             \
+                    static_cast<void>(cvr.add_range(upper_op, *parsed));                       \
+                }                                                                              \
+            }                                                                                  \
+        }                                                                                      \
+        boundary.boundary_cvr = std::move(cvr);                                                \
+        parsed_ok = true;                                                                      \
+        break;                                                                                 \
+    }
+
+        switch (ptype) {
+            BUILD_BOUNDARY_CVR(TINYINT)
+            BUILD_BOUNDARY_CVR(SMALLINT)
+            BUILD_BOUNDARY_CVR(INT)
+            BUILD_BOUNDARY_CVR(BIGINT)
+            BUILD_BOUNDARY_CVR(LARGEINT)
+            BUILD_BOUNDARY_CVR(FLOAT)
+            BUILD_BOUNDARY_CVR(DOUBLE)
+            BUILD_BOUNDARY_CVR(CHAR)
+            BUILD_BOUNDARY_CVR(DATE)
+            BUILD_BOUNDARY_CVR(DATETIME)
+            BUILD_BOUNDARY_CVR(DATEV2)
+            BUILD_BOUNDARY_CVR(DATETIMEV2)
+            BUILD_BOUNDARY_CVR(TIMESTAMPTZ)
+            BUILD_BOUNDARY_CVR(VARCHAR)
+            BUILD_BOUNDARY_CVR(STRING)
+            BUILD_BOUNDARY_CVR(DECIMAL32)
+            BUILD_BOUNDARY_CVR(DECIMAL64)
+            BUILD_BOUNDARY_CVR(DECIMAL128I)
+            BUILD_BOUNDARY_CVR(DECIMAL256)
+            BUILD_BOUNDARY_CVR(DECIMALV2)
+            BUILD_BOUNDARY_CVR(BOOLEAN)
+            BUILD_BOUNDARY_CVR(IPV4)
+            BUILD_BOUNDARY_CVR(IPV6)
+        default:
+            break;
+        }
+#undef BUILD_BOUNDARY_CVR
+
+        if (parsed_ok) {
+            _partition_column_slot_ids.insert(slot_id);
+            _slot_to_boundaries[slot_id].push_back(std::move(boundary));
+        }
+    }
+
+    // Count distinct partition IDs across all boundaries.
+    if (!_partition_column_slot_ids.empty()) {
+        phmap::flat_hash_set<int64_t> all_partition_ids;
+        for (const auto& [_, slot_boundaries] : _slot_to_boundaries) {
+            for (const auto& pb : slot_boundaries) {
+                all_partition_ids.insert(pb.partition_id);
+            }
+        }
+        _total_partition_count = static_cast<int64_t>(all_partition_ids.size());
+    }
+}
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
+static const VSlotRef* find_unique_slot_ref(const VExpr* expr) {
+    if (!expr) {
+        return nullptr;
+    }
+    if (expr->is_slot_ref()) {
+        return assert_cast<const VSlotRef*>(expr);
+    }
+    const VSlotRef* found = nullptr;
+    for (const auto& child : expr->children()) {
+        const VSlotRef* c = find_unique_slot_ref(child.get());
+        if (c) {
+            if (found) {
+                return nullptr; // multiple slot refs, can't handle
+            }
+            found = c;
+        }
+    }
+    return found;
+}
+
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+const std::vector<ParsedBoundary>* ParsedPartitionBoundaries::get_or_compute_projection(
+        int filter_id, const VExprSPtr& target_expr, SlotId leaf_slot_id, int leaf_column_id,
+        TTargetExprMonotonicity::type direction, VExprContext* ctx) const {
+    {
+        std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+        auto it = _projection_cache.find(filter_id);
+        if (it != _projection_cache.end()) {
+            return &it->second;
+        }
+    }
+
+    // Build projection
+    std::vector<ParsedBoundary> projected;
+
+    auto slot_boundaries_it = _slot_to_boundaries.find(leaf_slot_id);
+    if (slot_boundaries_it == _slot_to_boundaries.end()) {
+        std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+        _projection_cache[filter_id] = std::move(projected);
+        return &_projection_cache[filter_id];
+    }
+
+    const auto& orig_boundaries = slot_boundaries_it->second;
+    if (orig_boundaries.empty()) {
+        std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+        _projection_cache[filter_id] = std::move(projected);
+        return &_projection_cache[filter_id];
+    }
+
+    auto slot_type_it = _slot_data_types.find(leaf_slot_id);
+    if (slot_type_it == _slot_data_types.end()) {
+        std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+        _projection_cache[filter_id] = std::move(projected);
+        return &_projection_cache[filter_id];
+    }
+
+    // LIST partitions: TODO(rf-partition-prune) -- projecting through a
+    // monotonic function preserves set membership but requires re-grouping
+    // the projected values back per-partition. Skip for now (caller treats
+    // empty cache as "no projection available").
+    {
+        bool any_list = false;
+        std::visit([&](const auto& cvr) { any_list = cvr.is_fixed_value_range(); },
+                   orig_boundaries[0].boundary_cvr);
+        if (any_list) {
+            std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+            _projection_cache[filter_id] = std::move(projected);
+            return &_projection_cache[filter_id];
+        }
+    }
+
+    const DataTypePtr& input_type = slot_type_it->second;
+    bool input_is_nullable = input_type->is_nullable();
+    DataTypePtr inner_type = input_is_nullable ? remove_nullable(input_type) : input_type;
+    PrimitiveType input_ptype = input_type->get_primitive_type();
+
+    size_t N = orig_boundaries.size();
+
+    // Track open bounds (low_value == TYPE_MIN or high_value == TYPE_MAX)
+    // externally so we can propagate null_in_projection regardless of whether
+    // the input column is nullable.
+    std::vector<bool> ext_null_lo(N, false);
+    std::vector<bool> ext_null_hi(N, false);
+
+    // Build input blocks for lo and hi
+    Block lo_block;
+    Block hi_block;
+
+    // Pad columns 0..leaf_column_id-1 with placeholder columns so the leaf
+    // VSlotRef (whose column_id is leaf_column_id) reads our typed column.
+    for (int col_idx = 0; col_idx < leaf_column_id; ++col_idx) {
+        auto add_dummy = [&](Block& blk) {
+            auto col = ColumnUInt8::create(N, 0);
+            blk.insert({std::move(col), std::make_shared<DataTypeUInt8>(),
+                        fmt::format("dummy_{}", col_idx)});
+        };
+        add_dummy(lo_block);
+        add_dummy(hi_block);
+    }
+
+    // Macro-dispatch on input PrimitiveType to build the typed value column
+    // for each side. Inserts NULL (when input column is nullable) or default
+    // (when not) for boundaries that are only_null / open / type-mismatched;
+    // ext_null_lo/hi tracks these so we can mark them null_in_projection
+    // after executing the projection.
+    bool input_built = false;
+#define BUILD_INPUT_COLUMNS(INPUT_PT)                                                            \
+    case TYPE_##INPUT_PT: {                                                                      \
+        using InCol = typename PrimitiveTypeTraits<TYPE_##INPUT_PT>::ColumnType;                 \
+        MutableColumnPtr lo_inner_base = inner_type->create_column();                            \
+        MutableColumnPtr hi_inner_base = inner_type->create_column();                            \
+        auto* lo_inner = assert_cast<InCol*>(lo_inner_base.get());                               \
+        auto* hi_inner = assert_cast<InCol*>(hi_inner_base.get());                               \
+        lo_inner->reserve(N);                                                                    \
+        hi_inner->reserve(N);                                                                    \
+        auto lo_nulls = ColumnUInt8::create();                                                   \
+        auto hi_nulls = ColumnUInt8::create();                                                   \
+        lo_nulls->reserve(N);                                                                    \
+        hi_nulls->reserve(N);                                                                    \
+        for (size_t i = 0; i < N; ++i) {                                                         \
+            const auto& boundary = orig_boundaries[i];                                           \
+            bool null_lo = boundary.only_null;                                                   \
+            bool null_hi = boundary.only_null;                                                   \
+            const auto* cvr =                                                                    \
+                    std::get_if<ColumnValueRange<TYPE_##INPUT_PT>>(&boundary.boundary_cvr);      \
+            if (cvr == nullptr) {                                                                \
+                null_lo = null_hi = true;                                                        \
+            } else {                                                                             \
+                if (cvr->is_low_value_minimum()) null_lo = true;                                 \
+                if (cvr->is_high_value_maximum()) null_hi = true;                                \
+            }                                                                                    \
+            ext_null_lo[i] = null_lo;                                                            \
+            ext_null_hi[i] = null_hi;                                                            \
+            if (null_lo) {                                                                       \
+                lo_inner->insert_default();                                                      \
+                lo_nulls->get_data().push_back(1);                                               \
+            } else {                                                                             \
+                lo_inner->insert_value(cvr->get_range_min_value());                              \
+                lo_nulls->get_data().push_back(0);                                               \
+            }                                                                                    \
+            if (null_hi) {                                                                       \
+                hi_inner->insert_default();                                                      \
+                hi_nulls->get_data().push_back(1);                                               \
+            } else {                                                                             \
+                hi_inner->insert_value(cvr->get_range_max_value());                              \
+                hi_nulls->get_data().push_back(0);                                               \
+            }                                                                                    \
+        }                                                                                        \
+        if (input_is_nullable) {                                                                 \
+            auto lo_col = ColumnNullable::create(std::move(lo_inner_base), std::move(lo_nulls)); \
+            auto hi_col = ColumnNullable::create(std::move(hi_inner_base), std::move(hi_nulls)); \
+            lo_block.insert({std::move(lo_col), input_type, "leaf_slot"});                       \
+            hi_block.insert({std::move(hi_col), input_type, "leaf_slot"});                       \
+        } else {                                                                                 \
+            lo_block.insert({std::move(lo_inner_base), input_type, "leaf_slot"});                \
+            hi_block.insert({std::move(hi_inner_base), input_type, "leaf_slot"});                \
+        }                                                                                        \
+        input_built = true;                                                                      \
+        break;                                                                                   \
+    }
+
+    switch (input_ptype) {
+        BUILD_INPUT_COLUMNS(TINYINT)
+        BUILD_INPUT_COLUMNS(SMALLINT)
+        BUILD_INPUT_COLUMNS(INT)
+        BUILD_INPUT_COLUMNS(BIGINT)
+        BUILD_INPUT_COLUMNS(LARGEINT)
+        BUILD_INPUT_COLUMNS(FLOAT)
+        BUILD_INPUT_COLUMNS(DOUBLE)
+        BUILD_INPUT_COLUMNS(CHAR)
+        BUILD_INPUT_COLUMNS(DATE)
+        BUILD_INPUT_COLUMNS(DATETIME)
+        BUILD_INPUT_COLUMNS(DATEV2)
+        BUILD_INPUT_COLUMNS(DATETIMEV2)
+        BUILD_INPUT_COLUMNS(TIMESTAMPTZ)
+        BUILD_INPUT_COLUMNS(VARCHAR)
+        BUILD_INPUT_COLUMNS(STRING)
+        BUILD_INPUT_COLUMNS(DECIMAL32)
+        BUILD_INPUT_COLUMNS(DECIMAL64)
+        BUILD_INPUT_COLUMNS(DECIMAL128I)
+        BUILD_INPUT_COLUMNS(DECIMAL256)
+        BUILD_INPUT_COLUMNS(DECIMALV2)
+        BUILD_INPUT_COLUMNS(BOOLEAN)
+        BUILD_INPUT_COLUMNS(IPV4)
+        BUILD_INPUT_COLUMNS(IPV6)
+    default:
+        break;
+    }
+#undef BUILD_INPUT_COLUMNS
+
+    if (!input_built) {
+        std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+        _projection_cache[filter_id] = std::move(projected);
+        return &_projection_cache[filter_id];
+    }
+
+    int lo_result_id = -1;
+    int hi_result_id = -1;
+    auto status_lo = target_expr->execute(ctx, &lo_block, &lo_result_id);
+    auto status_hi = target_expr->execute(ctx, &hi_block, &hi_result_id);
+    if (!status_lo.ok() || !status_hi.ok() || lo_result_id < 0 || hi_result_id < 0) {
+        std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+        _projection_cache[filter_id] = std::move(projected);
+        return &_projection_cache[filter_id];
+    }
+
+    const auto& lo_result_col = lo_block.get_by_position(lo_result_id).column;
+    const auto& hi_result_col = hi_block.get_by_position(hi_result_id).column;
+
+    int out_precision = cast_set<int>(target_expr->data_type()->get_precision());
+    int out_scale = cast_set<int>(target_expr->data_type()->get_scale());
+    bool out_nullable = target_expr->data_type()->is_nullable();
+    PrimitiveType output_ptype = target_expr->data_type()->get_primitive_type();
+
+    projected.resize(N);
+
+#define BUILD_PROJECTED_CVR(OUTPUT_PT)                                                         \
+    case TYPE_##OUTPUT_PT: {                                                                   \
+        using OutputCppType = typename PrimitiveTypeTraits<TYPE_##OUTPUT_PT>::CppType;         \
+        for (size_t i = 0; i < N; ++i) {                                                       \
+            projected[i].partition_id = orig_boundaries[i].partition_id;                       \
+            projected[i].slot_id = leaf_slot_id;                                               \
+            projected[i].is_nullable = out_nullable;                                           \
+            projected[i].only_null = orig_boundaries[i].only_null;                             \
+            projected[i].contains_null = orig_boundaries[i].contains_null;                     \
+            bool lo_is_null = ext_null_lo[i] || lo_result_col->is_null_at(i);                  \
+            bool hi_is_null = ext_null_hi[i] || hi_result_col->is_null_at(i);                  \
+            if (lo_is_null || hi_is_null) {                                                    \
+                projected[i].null_in_projection = true;                                        \
+                ColumnValueRange<TYPE_##OUTPUT_PT> full_range("", out_nullable, out_precision, \
+                                                              out_scale);                      \
+                projected[i].boundary_cvr = std::move(full_range);                             \
+            } else {                                                                           \
+                Field lo_field = (*lo_result_col)[i];                                          \
+                Field hi_field = (*hi_result_col)[i];                                          \
+                OutputCppType lo_projected = lo_field.get<TYPE_##OUTPUT_PT>();                 \
+                OutputCppType hi_projected = hi_field.get<TYPE_##OUTPUT_PT>();                 \
+                ColumnValueRange<TYPE_##OUTPUT_PT> cvr("", out_nullable, out_precision,        \
+                                                       out_scale);                             \
+                if (direction == TTargetExprMonotonicity::MONOTONIC_DECREASING) {              \
+                    static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, hi_projected));    \
+                    static_cast<void>(cvr.add_range(FILTER_LESS_OR_EQUAL, lo_projected));      \
+                } else {                                                                       \
+                    static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, lo_projected));    \
+                    static_cast<void>(cvr.add_range(FILTER_LESS_OR_EQUAL, hi_projected));      \
+                }                                                                              \
+                projected[i].boundary_cvr = std::move(cvr);                                    \
+            }                                                                                  \
+        }                                                                                      \
+        break;                                                                                 \
+    }
+
+    switch (output_ptype) {
+        BUILD_PROJECTED_CVR(TINYINT)
+        BUILD_PROJECTED_CVR(SMALLINT)
+        BUILD_PROJECTED_CVR(INT)
+        BUILD_PROJECTED_CVR(BIGINT)
+        BUILD_PROJECTED_CVR(LARGEINT)
+        BUILD_PROJECTED_CVR(FLOAT)
+        BUILD_PROJECTED_CVR(DOUBLE)
+        BUILD_PROJECTED_CVR(CHAR)
+        BUILD_PROJECTED_CVR(DATE)
+        BUILD_PROJECTED_CVR(DATETIME)
+        BUILD_PROJECTED_CVR(DATEV2)
+        BUILD_PROJECTED_CVR(DATETIMEV2)
+        BUILD_PROJECTED_CVR(TIMESTAMPTZ)
+        BUILD_PROJECTED_CVR(VARCHAR)
+        BUILD_PROJECTED_CVR(STRING)
+        BUILD_PROJECTED_CVR(DECIMAL32)
+        BUILD_PROJECTED_CVR(DECIMAL64)
+        BUILD_PROJECTED_CVR(DECIMAL128I)
+        BUILD_PROJECTED_CVR(DECIMAL256)
+        BUILD_PROJECTED_CVR(DECIMALV2)
+        BUILD_PROJECTED_CVR(BOOLEAN)
+        BUILD_PROJECTED_CVR(IPV4)
+        BUILD_PROJECTED_CVR(IPV6)
+    default:
+        break;
+    }
+
+#undef BUILD_PROJECTED_CVR
+
+    std::lock_guard<std::mutex> lock(_projection_cache_mutex);
+    _projection_cache[filter_id] = std::move(projected);
+    return &_projection_cache[filter_id];
+}
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
+static SQLFilterOp convert_opcode_to_filter_op(TExprOpcode::type op) {
+    switch (op) {
+    case TExprOpcode::LE:
+        return FILTER_LESS_OR_EQUAL;
+    case TExprOpcode::LT:
+        return FILTER_LESS;
+    case TExprOpcode::GE:
+        return FILTER_LARGER_OR_EQUAL;
+    case TExprOpcode::GT:
+        return FILTER_LARGER;
+    default:
+        return FILTER_IN; // sentinel: caller should skip
+    }
+}
+
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+void RuntimeFilterPartitionPruner::_try_prune_by_single_rf(
+        const std::vector<ParsedBoundary>& boundaries, const VExprSPtr& impl,
+        phmap::flat_hash_set<int64_t>& newly_pruned) {
+    // Pre-compute whether the RF "matches NULL" -- i.e. whether the RF would
+    // accept a row whose probe value is NULL. The signal differs by RF impl:
+    //   * IN filter      → HybridSet::contain_null()
+    //   * Bloom filter   → BloomFilterFuncBase::contain_null()
+    //   * MinMax filter  → encoded via the BinaryPredicate node_type:
+    //                      NULL_AWARE_BINARY_PRED means the build side had a
+    //                      NULL key and a null-safe equal join asked NULL to
+    //                      match NULL; plain BINARY_PRED never matches NULL.
+    // FilterBase::contain_null() already folds in `_null_aware`, so we only
+    // get a true result when the build side is actually null-aware AND
+    // produced a NULL value.
+    bool rf_contains_null = false;
+    if (auto hybrid_set = impl->get_set_func()) {
+        rf_contains_null = hybrid_set->contain_null();
+    } else if (impl->node_type() == TExprNodeType::BLOOM_PRED) {
+        auto bloom = impl->get_bloom_filter_func();
+        rf_contains_null = bloom && bloom->contain_null();
+    } else if (impl->node_type() == TExprNodeType::NULL_AWARE_BINARY_PRED) {
+        // Min/Max RF built on a null-safe equal join. The literal child holds
+        // the min or max bound; the NULL semantic is conveyed by the node
+        // type itself (see create_vbin_predicate in runtime_filter/utils.cpp).
+        rf_contains_null = true;
+    }
+
+    for (const auto& pb : boundaries) {
+        if (_pruned_partition_ids.contains(pb.partition_id) ||
+            newly_pruned.contains(pb.partition_id)) {
+            continue;
+        }
+
+        // Skip partitions with NULL projection conservatively
+        if (pb.null_in_projection) {
+            continue;
+        }
+
+        // NULL handling:
+        //   A partition row whose key is NULL matches the RF iff `rf_contains_null`.
+        //   - only_null partition (rows are exclusively NULL): prunable iff !rf_contains_null.
+        //   - mixed (NULL + concrete values): if rf_contains_null, NULL rows alone
+        //     prevent pruning. Otherwise NULL rows can never match, so we ignore
+        //     the NULL portion and let the regular non-NULL intersection decide.
+        if (pb.only_null) {
+            if (!rf_contains_null) {
+                newly_pruned.insert(pb.partition_id);
+            }
+            continue;
+        }
+        if (pb.contains_null && rf_contains_null) {
+            continue;
+        }
+
+        std::visit(
+                [&](const auto& boundary_cvr) {
+                    using CvrType = std::decay_t<decltype(boundary_cvr)>;
+                    using CppType = typename CvrType::CppType;
+
+                    auto hybrid_set = impl->get_set_func();
+                    if (hybrid_set) {
+                        // IN filter: build a fixed-value CVR from the HybridSet
+                        auto rf_cvr = CvrType::create_empty_column_value_range(
+                                pb.is_nullable, boundary_cvr.precision(), boundary_cvr.scale());
+                        auto* iter = hybrid_set->begin();
+                        while (iter->has_next()) {
+                            const void* value = iter->get_value();
+                            if (value) {
+                                if constexpr (std::is_same_v<CppType, String>) {
+                                    // String HybridSets store StringRef*, but
+                                    // ColumnValueRange<String>::CppType is
+                                    // std::string. Construct from the bytes.
+                                    const auto* str_val = reinterpret_cast<const StringRef*>(value);
+                                    static_cast<void>(rf_cvr.add_fixed_value(
+                                            CppType(str_val->data, str_val->size)));
+                                } else if constexpr (std::is_same_v<CppType, StringRef>) {
+                                    const auto* str_val = reinterpret_cast<const StringRef*>(value);
+                                    static_cast<void>(rf_cvr.add_fixed_value(
+                                            CppType(str_val->data, str_val->size)));
+                                } else {
+                                    static_cast<void>(rf_cvr.add_fixed_value(
+                                            *reinterpret_cast<const CppType*>(value)));
+                                }
+                            }
+                            iter->next();
+                        }
+                        auto boundary_copy = boundary_cvr;
+                        boundary_copy.intersection(rf_cvr);
+                        if (boundary_copy.is_empty_value_range()) {
+                            newly_pruned.insert(pb.partition_id);
+                        }
+                    } else if ((impl->node_type() == TExprNodeType::BINARY_PRED ||
+                                impl->node_type() == TExprNodeType::NULL_AWARE_BINARY_PRED) &&
+                               impl->children().size() == 2 && impl->children()[1]->is_literal()) {
+                        // MinMax filter: binary pred with literal bound
+                        auto* literal = assert_cast<VLiteral*>(impl->children()[1].get());
+                        auto col_ptr = literal->get_column_ptr();
+                        auto data = col_ptr->get_data_at(0);
+                        CppType val {};
+                        if constexpr (std::is_same_v<CppType, String>) {
+                            // get_data_at returns a StringRef pointing at the
+                            // raw character bytes; ColumnValueRange<String>'s
+                            // CppType is std::string, so construct one from
+                            // the bytes rather than dereferencing as String.
+                            val = CppType(data.data, data.size);
+                        } else if constexpr (std::is_same_v<CppType, StringRef>) {
+                            val = CppType(data.data, data.size);
+                        } else {
+                            val = *reinterpret_cast<const CppType*>(data.data);
+                        }
+
+                        SQLFilterOp op = convert_opcode_to_filter_op(impl->op());
+                        if (op == FILTER_IN) {
+                            return; // unrecognized opcode, skip
+                        }
+
+                        CvrType rf_cvr(boundary_cvr.column_name(), pb.is_nullable,
+                                       boundary_cvr.precision(), boundary_cvr.scale());
+                        static_cast<void>(rf_cvr.add_range(op, val));
+
+                        auto boundary_copy = boundary_cvr;
+                        boundary_copy.intersection(rf_cvr);
+                        if (boundary_copy.is_empty_value_range()) {
+                            newly_pruned.insert(pb.partition_id);
+                        }
+                    }
+                },
+                pb.boundary_cvr);
+    }
+}
+
+int64_t RuntimeFilterPartitionPruner::prune_by_runtime_filters(
+        const ParsedPartitionBoundaries& parsed, const VExprContextSPtrs& conjuncts,
+        const std::vector<TRuntimeFilterDesc>& rf_descs, int scan_node_id) {
+    if (parsed.empty()) {
+        return 0;
+    }
+    const auto& partition_column_slot_ids = parsed.partition_column_slot_ids();
+
+    // Build filter_id -> monotonicity map
+    std::unordered_map<int, TTargetExprMonotonicity::type> filter_id_to_monotonicity;
+    for (const auto& desc : rf_descs) {
+        if (desc.__isset.planId_to_target_monotonicity) {
+            auto it = desc.planId_to_target_monotonicity.find(scan_node_id);
+            if (it != desc.planId_to_target_monotonicity.end()) {
+                filter_id_to_monotonicity[desc.filter_id] = it->second;
+            }
+        }
+    }
+
+    // This function is serialized by _conjuncts_lock in the caller, so our reads
+    // of _pruned_partition_ids never race with our writes below. The only concurrent
+    // readers are is_partition_pruned() calls (under shared_lock), which are
+    // properly synchronized by the unique_lock we take when inserting.
+    phmap::flat_hash_set<int64_t> newly_pruned;
+
+    for (const auto& conjunct_ctx : conjuncts) {
+        VExprSPtr root = conjunct_ctx->root();
+        if (!root->is_rf_wrapper()) {
+            continue;
+        }
+
+        VExprSPtr impl = root->get_impl();
+        if (!impl) {
+            continue;
+        }
+
+        if (impl->children().empty()) {
+            continue;
+        }
+
+        auto* wrapper_root = assert_cast<VRuntimeFilterWrapper*>(root.get());
+        int filter_id = wrapper_root->filter_id();
+
+        VExprSPtr target_subtree = impl->children()[0];
+
+        // Identity case: target is a simple SlotRef on a partition column
+        if (target_subtree->is_slot_ref()) {
+            auto* slot_ref = assert_cast<VSlotRef*>(target_subtree.get());
+            SlotId slot_id = slot_ref->slot_id();
+            if (!partition_column_slot_ids.contains(slot_id)) {
+                continue;
+            }
+
+            const auto& slot_to_boundaries = parsed.slot_to_boundaries();
+            auto boundaries_it = slot_to_boundaries.find(slot_id);
+            if (boundaries_it == slot_to_boundaries.end()) {
+                continue;
+            }
+
+            _try_prune_by_single_rf(boundaries_it->second, impl, newly_pruned);
+        } else {
+            // Non-identity case: check if it's a monotonic target
+            auto mono_it = filter_id_to_monotonicity.find(filter_id);
+            if (mono_it == filter_id_to_monotonicity.end() ||
+                mono_it->second == TTargetExprMonotonicity::NON_MONOTONIC) {
+                continue;
+            }
+
+            const VSlotRef* leaf_slot = find_unique_slot_ref(target_subtree.get());
+            if (!leaf_slot) {
+                continue;
+            }
+
+            SlotId leaf_slot_id = leaf_slot->slot_id();
+            if (!partition_column_slot_ids.contains(leaf_slot_id)) {
+                continue;
+            }
+
+            int leaf_column_id = leaf_slot->column_id();
+            TTargetExprMonotonicity::type direction = mono_it->second;
+
+            const std::vector<ParsedBoundary>* projected =
+                    parsed.get_or_compute_projection(filter_id, target_subtree, leaf_slot_id,
+                                                     leaf_column_id, direction, conjunct_ctx.get());
+
+            if (projected == nullptr || projected->empty()) {
+                continue;
+            }
+
+            _try_prune_by_single_rf(*projected, impl, newly_pruned);
+        }
+    }
+
+    auto count = static_cast<int64_t>(newly_pruned.size());
+    if (count > 0) {
+        std::unique_lock lock(_prune_mutex);
+        for (int64_t pid : newly_pruned) {
+            _pruned_partition_ids.insert(pid);
+        }
+    }
+    return count;
+}
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
+bool RuntimeFilterPartitionPruner::is_partition_pruned(int64_t partition_id) const {
+    std::shared_lock lock(_prune_mutex);
+    return _pruned_partition_ids.contains(partition_id);
+}
+
+int64_t RuntimeFilterPartitionPruner::pruned_partition_count() const {
+    std::shared_lock lock(_prune_mutex);
+    return static_cast<int64_t>(_pruned_partition_ids.size());
+}
+
+} // namespace doris
diff --git a/be/src/exec/runtime_filter/runtime_filter_partition_pruner.h b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.h
new file mode 100644
index 0000000..31712ae
--- /dev/null
+++ b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.h
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/global_types.h"
+#include "core/data_type/data_type.h"
+#include "exec/common/hash_table/phmap_fwd_decl.h"
+#include "exprs/vexpr_fwd.h"
+#include "storage/olap_scan_common.h"
+
+namespace doris {
+
+class SlotDescriptor;
+class VExprContext;
+struct TPartitionBoundary;
+struct TRuntimeFilterDesc;
+struct TTargetExprMonotonicity;
+
+// Parsed representation of one partition boundary for one slot column.
+struct ParsedBoundary {
+    int64_t partition_id = 0;
+    SlotId slot_id = 0;
+    bool is_nullable = false;
+    ColumnValueRangeType boundary_cvr;
+    // True if the partition's value set is exactly {NULL} (e.g. LIST
+    // partition whose only key is NULL). The CVR alone cannot encode
+    // "only NULL" -- it stays as the whole range with contain_null=true
+    // -- so we track it explicitly to enable accurate pruning.
+    bool only_null = false;
+    // True if the partition's value set includes NULL (covers both
+    // only-NULL and mixed LIST partitions). Tracked separately because
+    // ColumnValueRange::set_contain_null(true) destructively clears the
+    // fixed-value set, so we cannot stash the NULL flag inside the CVR
+    // alongside the concrete values.
+    bool contains_null = false;
+    // For projected boundaries: true if the target_expr produced NULL on this
+    // partition's input value (cannot prune conservatively).
+    bool null_in_projection = false;
+};
+
+// Immutable, fragment-shared parse result of TPartitionBoundary list.
+//
+// Parsing is expensive (constructs VLiteral per literal, materializes a
+// ColumnPtr, builds ColumnValueRange) and depends only on plan-time data.
+// All pipeline instances of the same fragment share one parse, performed
+// in OperatorX::prepare() which is single-threaded fragment setup.
+class ParsedPartitionBoundaries {
+public:
+    ParsedPartitionBoundaries() = default;
+
+    // Build the parse result from the thrift `boundaries` list. Caller must
+    // ensure this is invoked at most once per instance (OperatorX::prepare()
+    // is the natural call site).
+    void parse(const std::vector<TPartitionBoundary>& boundaries,
+               const phmap::flat_hash_map<int, SlotDescriptor*>& slot_descs);
+
+    bool empty() const { return _partition_column_slot_ids.empty(); }
+    int64_t total_partitions() const { return _total_partition_count; }
+
+    const std::unordered_map<SlotId, std::vector<ParsedBoundary>>& slot_to_boundaries() const {
+        return _slot_to_boundaries;
+    }
+    const std::unordered_set<SlotId>& partition_column_slot_ids() const {
+        return _partition_column_slot_ids;
+    }
+
+    // Lazily compute projected boundaries for a non-identity monotonic target.
+    // `target_expr` is `impl->children()[0]` of the RF wrapper (a sub-tree of
+    // the conjunct). `leaf_slot_id` is the unique VSlotRef leaf inside it
+    // (FE asserted target_expr has exactly one input slot). `leaf_column_id`
+    // is that slot ref's `column_id()` -- the position in the runtime block.
+    // `direction` is the FE-side cumulative monotonicity. `ctx` is the
+    // conjunct's VExprContext (used to execute the sub-expression).
+    //
+    // Returns nullptr if projection is unsupported (e.g. non-RANGE partition,
+    // unsupported primitive type) -- caller then skips this RF.
+    //
+    // Direction:
+    //   MONOTONIC_INCREASING: projected lo and hi keep their roles
+    //   MONOTONIC_DECREASING: swap (projected lo, hi) -> (hi, lo)
+    //
+    // NULL: any input value that projects to NULL marks
+    //   null_in_projection=true; the existing _try_prune_by_single_rf
+    //   conservatively keeps the partition.
+    const std::vector<ParsedBoundary>* get_or_compute_projection(
+            int filter_id, const VExprSPtr& target_expr, SlotId leaf_slot_id, int leaf_column_id,
+            TTargetExprMonotonicity::type direction, VExprContext* ctx) const;
+
+private:
+    std::unordered_map<SlotId, std::vector<ParsedBoundary>> _slot_to_boundaries;
+    std::unordered_set<SlotId> _partition_column_slot_ids;
+    int64_t _total_partition_count = 0;
+    std::unordered_map<SlotId, DataTypePtr> _slot_data_types;
+
+    mutable std::mutex _projection_cache_mutex;
+    mutable std::unordered_map<int /*filter_id*/, std::vector<ParsedBoundary>> _projection_cache;
+};
+
+// Per-instance pruning state for runtime-filter partition pruning.
+//
+// Holds the set of partition IDs already pruned for this scan instance and
+// nothing else: the parsed boundaries are shared per-fragment and reached via
+// `OperatorXBase::parsed_partition_boundaries()`. The owner (ScanLocalStateBase)
+// passes the parsed object into `prune_by_runtime_filters` on each call.
+//
+// Thread safety: `is_partition_pruned()` is safe to call concurrently with
+// `prune_by_runtime_filters()` via an internal shared_mutex.
+class RuntimeFilterPartitionPruner {
+public:
+    RuntimeFilterPartitionPruner() = default;
+
+    // Evaluate RF conjuncts against the given parsed boundaries and mark
+    // pruned partitions on this per-instance state. Returns the number of
+    // *newly* pruned partitions in this call.
+    int64_t prune_by_runtime_filters(const ParsedPartitionBoundaries& parsed,
+                                     const VExprContextSPtrs& conjuncts,
+                                     const std::vector<TRuntimeFilterDesc>& rf_descs,
+                                     int scan_node_id);
+
+    // Thread-safe query: is the given partition_id pruned?
+    bool is_partition_pruned(int64_t partition_id) const;
+
+    // Number of partitions currently marked as pruned.
+    int64_t pruned_partition_count() const;
+
+private:
+    phmap::flat_hash_set<int64_t> _pruned_partition_ids;
+    mutable std::shared_mutex _prune_mutex;
+
+    // Try to prune partitions using a single RF's impl expression on the given boundaries.
+    // Adds newly pruned partition IDs to `newly_pruned`.
+    void _try_prune_by_single_rf(const std::vector<ParsedBoundary>& boundaries,
+                                 const VExprSPtr& impl,
+                                 phmap::flat_hash_set<int64_t>& newly_pruned);
+};
+
+} // namespace doris
diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp
index 96e3e12..57af3ee 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -610,6 +610,13 @@
     return Status::OK();
 }
 
+bool OlapScanner::check_partition_pruned() const {
+    if (!_local_state) {
+        return false;
+    }
+    return _local_state->is_partition_pruned(_tablet_reader_params.tablet->partition_id());
+}
+
 doris::TabletStorageType OlapScanner::get_storage_type() {
     if (config::is_cloud_mode()) {
         // we don't have cold storage in cloud mode, all storage is treated as local
diff --git a/be/src/exec/scan/olap_scanner.h b/be/src/exec/scan/olap_scanner.h
index 2187be4..ab84525 100644
--- a/be/src/exec/scan/olap_scanner.h
+++ b/be/src/exec/scan/olap_scanner.h
@@ -77,6 +77,8 @@
 
     doris::TabletStorageType get_storage_type() override;
 
+    bool check_partition_pruned() const override;
+
     void update_realtime_counters() override;
 
 protected:
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index 14d1b9e..bf38dda 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -181,6 +181,10 @@
         return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
     }
 
+    // Returns true if this scanner's partition has been pruned by a runtime filter.
+    // Overridden by OlapScanner to check partition pruning state.
+    virtual bool check_partition_pruned() const { return false; }
+
     bool need_to_close() const { return _need_to_close; }
 
     void mark_to_need_to_close() {
diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp
index 7abe192..5296fa4 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -203,6 +203,9 @@
                              << rf_status.to_string();
             }
 
+            // After processing late RFs, check if this scanner's partition was pruned.
+            if (!eos && scanner->check_partition_pruned()) { eos = true; }
+
             size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
             if (ctx->low_memory_mode()) {
                 ctx->clear_free_blocks();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 5192df1..9807ce5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -22,6 +22,9 @@
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
 import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.Expression;
@@ -33,6 +36,7 @@
 import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.DistributionMode;
 import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
 import org.apache.doris.planner.ScanNode;
@@ -41,6 +45,7 @@
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
 import org.apache.doris.thrift.TRuntimeFilterType;
+import org.apache.doris.thrift.TTargetExprMonotonicity;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -170,6 +175,7 @@
             List<Expr> targetExprList = new ArrayList<>();
             List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new ArrayList<>();
             List<ScanNode> scanNodeList = new ArrayList<>();
+            List<RuntimeFilter> targetFilterList = new ArrayList<>();
             boolean hasInvalidTarget = false;
             for (RuntimeFilter filter : group) {
                 Slot curTargetSlot = filter.getTargetSlot();
@@ -204,6 +210,7 @@
                 TupleId targetTupleId = targetSlotRef.getDesc().getParentId();
                 SlotId targetSlotId = targetSlotRef.getSlotId();
                 scanNodeList.add(scanNode);
+                targetFilterList.add(filter);
                 targetExprList.add(targetExpr);
                 targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)));
             }
@@ -228,6 +235,9 @@
                     Expr targetExpr = targetExprList.get(i);
                     origFilter.addTarget(new RuntimeFilterTarget(
                             scanNode, targetExpr, true, isLocalTarget));
+                    TTargetExprMonotonicity mono = classifyMonotonicityForPartitionPruning(
+                            targetFilterList.get(i).getTargetExpression(), scanNode);
+                    origFilter.setTargetMonotonicity(scanNode.getId(), mono);
                 }
                 origFilter.setBitmapFilterNotIn(head.isBitmapFilterNotIn());
                 origFilter.setBloomFilterSizeCalculatedByNdv(head.isBloomFilterSizeCalculatedByNdv());
@@ -322,6 +332,9 @@
                     Expr targetExpr = targetExprList.get(i);
                     origFilter.addTarget(new RuntimeFilterTarget(
                             scanNode, targetExpr, true, isLocalTarget));
+                    TTargetExprMonotonicity mono = classifyMonotonicityForPartitionPruning(
+                            filter.getTargetExpressions().get(i), scanNode);
+                    origFilter.setTargetMonotonicity(scanNode.getId(), mono);
                 }
                 origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
                 origFilter.setBloomFilterSizeCalculatedByNdv(filter.isBloomFilterSizeCalculatedByNdv());
@@ -350,4 +363,43 @@
         origFilter.extractTargetsPosition();
         return origFilter;
     }
+
+    /**
+     * Classify whether a runtime-filter target expression can drive partition
+     * pruning on the given scan node.
+     *
+     * <p>Conservative implementation: only identity {@link Slot} references on a
+     * partition column are treated as monotonic increasing. Although Nereids
+     * exposes a {@code Monotonic} interface, its
+     * {@code isMonotonic(Literal lower, Literal upper)} contract is range-aware
+     * and most current implementations do not yet take the bounds into account
+     * correctly; treating every {@code instanceof Monotonic} node as
+     * unconditionally monotonic over the full domain would be unsound. Once the
+     * {@code Monotonic} implementations are completed/audited we can revisit
+     * this and walk through monotonic chains here.
+     */
+    static TTargetExprMonotonicity classifyMonotonicityForPartitionPruning(
+            Expression nereidsExpr, org.apache.doris.planner.PlanNode scanNode) {
+        if (!(scanNode instanceof OlapScanNode)) {
+            return TTargetExprMonotonicity.NON_MONOTONIC;
+        }
+        if (!(nereidsExpr instanceof Slot)) {
+            return TTargetExprMonotonicity.NON_MONOTONIC;
+        }
+        OlapTable table = ((OlapScanNode) scanNode).getOlapTable();
+        if (table == null) {
+            return TTargetExprMonotonicity.NON_MONOTONIC;
+        }
+        PartitionType partType = table.getPartitionInfo().getType();
+        if (partType != PartitionType.RANGE && partType != PartitionType.LIST) {
+            return TTargetExprMonotonicity.NON_MONOTONIC;
+        }
+        String colName = ((Slot) nereidsExpr).getName();
+        for (Column partCol : table.getPartitionInfo().getPartitionColumns()) {
+            if (partCol.getName().equalsIgnoreCase(colName)) {
+                return TTargetExprMonotonicity.MONOTONIC_INCREASING;
+            }
+        }
+        return TTargetExprMonotonicity.NON_MONOTONIC;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 8a65687..d60a29c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -21,6 +21,8 @@
 import org.apache.doris.analysis.ExprToSqlVisitor;
 import org.apache.doris.analysis.ExprToThriftVisitor;
 import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.MaxLiteral;
+import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
@@ -36,13 +38,16 @@
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.IndexToThriftConvertor;
+import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Partition.PartitionState;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Tablet;
@@ -66,12 +71,14 @@
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TNormalizedOlapScanNode;
 import org.apache.doris.thrift.TNormalizedPlanNode;
 import org.apache.doris.thrift.TOlapScanNode;
 import org.apache.doris.thrift.TOlapTableIndex;
 import org.apache.doris.thrift.TPaloScanRange;
+import org.apache.doris.thrift.TPartitionBoundary;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TPrimitiveType;
@@ -1211,9 +1218,169 @@
 
         msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds));
 
+        // Populate partition boundaries for BE-side runtime filter partition pruning.
+        // Only serialize when this scan node actually has at least one runtime
+        // filter whose target expression can drive partition pruning (identity
+        // or monotonic function on a partition column), so we don't bloat
+        // thrift for tables with many partitions but no usable RF target.
+        // Gated by session variable `enable_runtime_filter_partition_prune`.
+        ConnectContext rfPruneCtx = ConnectContext.get();
+        if (rfPruneCtx != null
+                && rfPruneCtx.getSessionVariable().isEnableRuntimeFilterPartitionPrune()
+                && hasRfDrivingPartitionPruning()) {
+            setPartitionBoundaries(msg.olap_scan_node);
+        }
+
         super.toThrift(msg);
     }
 
+    private boolean hasRfDrivingPartitionPruning() {
+        if (selectedPartitionIds == null || selectedPartitionIds.size() < 2) {
+            return false;
+        }
+        PlanNodeId myId = this.getId();
+        for (RuntimeFilter rf : runtimeFilters) {
+            if (rf.canPrunePartitionsFor(myId)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void setPartitionBoundaries(TOlapScanNode olapScanNode) {
+        PartitionInfo partitionInfo = olapTable.getPartitionInfo();
+        PartitionType partType = partitionInfo.getType();
+        if (partType != PartitionType.RANGE && partType != PartitionType.LIST) {
+            return;
+        }
+        List<Column> partColumns = partitionInfo.getPartitionColumns();
+        if (partColumns.isEmpty()) {
+            return;
+        }
+
+        // Build partition column name → slot ID mapping
+        Map<String, Integer> partColToSlotId = Maps.newHashMap();
+        for (SlotDescriptor slot : desc.getSlots()) {
+            if (slot.getColumn() == null) {
+                continue;
+            }
+            for (Column partCol : partColumns) {
+                if (slot.getColumn().getName().equalsIgnoreCase(partCol.getName())) {
+                    partColToSlotId.put(partCol.getName(), slot.getId().asInt());
+                    break;
+                }
+            }
+        }
+        if (partColToSlotId.isEmpty()) {
+            return;
+        }
+
+        List<TPartitionBoundary> boundaries = new ArrayList<>();
+        for (Long partitionId : selectedPartitionIds) {
+            PartitionItem item = partitionInfo.getItem(partitionId);
+            if (item == null) {
+                continue;
+            }
+            if (item instanceof RangePartitionItem) {
+                addRangeBoundaries(boundaries, partitionId, (RangePartitionItem) item,
+                        partColumns, partColToSlotId);
+            } else if (item instanceof ListPartitionItem) {
+                addListBoundaries(boundaries, partitionId, (ListPartitionItem) item,
+                        partColumns, partColToSlotId);
+            }
+        }
+        if (!boundaries.isEmpty()) {
+            olapScanNode.setPartitionBoundaries(boundaries);
+        }
+    }
+
+    private void addRangeBoundaries(List<TPartitionBoundary> boundaries, long partitionId,
+            RangePartitionItem rangeItem, List<Column> partColumns,
+            Map<String, Integer> partColToSlotId) {
+        // We always project a (possibly multi-column) RANGE partition onto its
+        // first partition column, since the BE pruner only consumes per-column
+        // boundaries. Projection rules (lex compare semantics):
+        //
+        //   single column [L, U):
+        //       projection = [L, U)              → range_end_inclusive = false
+        //
+        //   multi-column [(L1, L2, ...), (U1, U2, ...)):
+        //       k1 = L1 is reachable (inner tuple can be ≥ (L2, ...))
+        //       k1 ∈ (L1, U1) is fully reachable
+        //       k1 = U1 may be reachable via inner tuple < (U2, ...)
+        //       projection = [L1, U1] (CLOSED both ends, conservative)
+        //                                        → range_end_inclusive = true
+        //
+        // The half-open form [L1, U1) for multi-column would be a strict
+        // UNDER-approximation. Example: partition [(1,1), (1,5)) projects to
+        // {1}, but [1, 1) is empty and would let the BE wrongly prune the
+        // partition for an RF like k1 = 1.
+        String colName = partColumns.get(0).getName();
+        Integer slotId = partColToSlotId.get(colName);
+        if (slotId == null) {
+            return;
+        }
+        com.google.common.collect.Range<PartitionKey> range = rangeItem.getItems();
+        TPartitionBoundary boundary = new TPartitionBoundary();
+        boundary.setPartitionId(partitionId);
+        boundary.setSlotId(slotId);
+        if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) {
+            LiteralExpr lower = range.lowerEndpoint().getKeys().get(0);
+            if (!(lower instanceof MaxLiteral)) {
+                boundary.setRangeStart(
+                        ExprToThriftVisitor.treeToThrift(lower).getNodes().get(0));
+            }
+        }
+        if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) {
+            LiteralExpr upper = range.upperEndpoint().getKeys().get(0);
+            if (!(upper instanceof MaxLiteral)) {
+                boundary.setRangeEnd(
+                        ExprToThriftVisitor.treeToThrift(upper).getNodes().get(0));
+            }
+        }
+        if (partColumns.size() > 1) {
+            boundary.setRangeEndInclusive(true);
+        }
+        boundaries.add(boundary);
+    }
+
+    private void addListBoundaries(List<TPartitionBoundary> boundaries, long partitionId,
+            ListPartitionItem listItem, List<Column> partColumns,
+            Map<String, Integer> partColToSlotId) {
+        if (listItem.isDefaultPartition()) {
+            return;
+        }
+        List<PartitionKey> partitionKeys = listItem.getItems();
+        // For LIST partitions, emit per-column distinct value sets. NULL keys
+        // are emitted as NULL_LITERAL TExprNode so the BE parser can translate
+        // them into ColumnValueRange::set_contain_null(true) rather than
+        // treating NULL as an ordinary fixed value (which would crash the
+        // typed value extractor in the parser).
+        for (int i = 0; i < partColumns.size(); i++) {
+            String colName = partColumns.get(i).getName();
+            Integer slotId = partColToSlotId.get(colName);
+            if (slotId == null) {
+                continue;
+            }
+            TPartitionBoundary boundary = new TPartitionBoundary();
+            boundary.setPartitionId(partitionId);
+            boundary.setSlotId(slotId);
+            List<TExprNode> listValues = new ArrayList<>(partitionKeys.size());
+            for (PartitionKey pk : partitionKeys) {
+                LiteralExpr literalExpr = pk.getKeys().get(i);
+                if (literalExpr.isNullLiteral()) {
+                    listValues.add(ExprToThriftVisitor.treeToThrift(
+                            NullLiteral.create(literalExpr.getType())).getNodes().get(0));
+                } else {
+                    listValues.add(
+                            ExprToThriftVisitor.treeToThrift(literalExpr).getNodes().get(0));
+                }
+            }
+            boundary.setListValues(listValues);
+            boundaries.add(boundary);
+        }
+    }
+
     @Override
     public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) {
         TNormalizedOlapScanNode normalizedOlapScanNode = new TNormalizedOlapScanNode();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index f4ea16c..4eb152f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -31,6 +31,7 @@
 import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
 import org.apache.doris.thrift.TRuntimeFilterDesc;
 import org.apache.doris.thrift.TRuntimeFilterType;
+import org.apache.doris.thrift.TTargetExprMonotonicity;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -39,6 +40,7 @@
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -133,6 +135,15 @@
 
     private boolean singleEq = false;
 
+    // Per-target monotonicity for BE-side runtime-filter partition pruning,
+    // keyed by the target scan node's plan id. Filled by the Nereids
+    // RuntimeFilterTranslator at translation time (where the original
+    // Nereids Expression is still available for monotonicity classification);
+    // legacy planner cannot redo the classification because it only has
+    // analysis.Expr. Read by toThrift() and canPrunePartitionsFor().
+    private final Map<PlanNodeId, TTargetExprMonotonicity> targetMonotonicityByScanId
+            = new HashMap<>();
+
     /**
      * Internal representation of a runtime filter target.
      */
@@ -310,9 +321,52 @@
         } else if (builderNode instanceof SetOperationNode) {
             tFilter.setNullAware(true);
         }
+
+        // Per-target monotonicity for BE-side partition pruning. Populated
+        // upstream by RuntimeFilterTranslator (where the original Nereids
+        // Expression is available); we only forward it to the BE here.
+        // Gated by session variable `enable_runtime_filter_partition_prune`.
+        ConnectContext rfPruneCtx = ConnectContext.get();
+        boolean enableRfPartitionPrune = rfPruneCtx != null
+                && rfPruneCtx.getSessionVariable().isEnableRuntimeFilterPartitionPrune();
+        if (enableRfPartitionPrune && !targetMonotonicityByScanId.isEmpty()) {
+            Map<Integer, TTargetExprMonotonicity> monoMap = new HashMap<>();
+            for (Map.Entry<PlanNodeId, TTargetExprMonotonicity> e
+                    : targetMonotonicityByScanId.entrySet()) {
+                if (e.getValue() != TTargetExprMonotonicity.NON_MONOTONIC) {
+                    monoMap.put(e.getKey().asInt(), e.getValue());
+                }
+            }
+            if (!monoMap.isEmpty()) {
+                tFilter.setPlanIdToTargetMonotonicity(monoMap);
+            }
+        }
+
         return tFilter;
     }
 
+    /**
+     * Record monotonicity for a target. Called by RuntimeFilterTranslator after
+     * the corresponding RuntimeFilterTarget has been added.
+     */
+    public void setTargetMonotonicity(PlanNodeId scanNodeId, TTargetExprMonotonicity m) {
+        targetMonotonicityByScanId.put(scanNodeId, m);
+    }
+
+    /**
+     * Returns true iff this RF can drive partition pruning for the given target
+     * scan node. Used by OlapScanNode.toThrift to decide whether it is worth
+     * serializing partition_boundaries to BE. Mirrors exactly the condition
+     * that toThrift() emits planId_to_target_monotonicity for: monotonic and
+     * grounded on a partition column. When BE later supports more shapes, the
+     * single source of truth for that decision lives in
+     * RuntimeFilterTranslator.classifyMonotonicityForPartitionPruning.
+     */
+    public boolean canPrunePartitionsFor(PlanNodeId scanNodeId) {
+        TTargetExprMonotonicity m = targetMonotonicityByScanId.get(scanNodeId);
+        return m != null && m != TTargetExprMonotonicity.NON_MONOTONIC;
+    }
+
     public boolean hasTargets() {
         return !targets.isEmpty();
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 843087f..91ad555 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -696,6 +696,49 @@
   6: optional string bloom_literal
 }
 
+// Partition boundary descriptor for BE-side runtime filter partition pruning.
+// FE sends only partitions that are candidates for pruning; partitions FE does
+// not want pruned (e.g. default catch-all partitions) are simply omitted.
+//
+// Partition type is inferred from which optional fields are set:
+//   - range_start / range_end set  →  Range partition
+//   - list_values set              →  List partition
+//
+// For Range partitions:
+//   - range_start absent  →  no lower-bound constraint (negative infinity)
+//   - range_end   absent  →  no upper-bound constraint (MAXVALUE / positive infinity)
+struct TPartitionBoundary {
+  1: optional Types.TPartitionId partition_id
+  // slot_id of the partition column
+  2: optional Types.TSlotId slot_id
+
+  // Range partition: closed lower bound; absent means unbounded below
+  3: optional Exprs.TExprNode range_start
+  // Range partition: upper bound. By default (range_end_inclusive=false) the
+  // bound is OPEN, i.e. the column range is [range_start, range_end), which
+  // matches Doris RANGE partition's `VALUES [..., ...)` syntax for the
+  // single-column case. Absent means unbounded above (MAXVALUE).
+  4: optional Exprs.TExprNode range_end
+
+  // List partition: set of concrete values in this partition. A NULL_LITERAL
+  // entry indicates the partition logically contains NULL rows for this column;
+  // the BE pruner translates that into ColumnValueRange::set_contain_null(true)
+  // rather than treating it as an ordinary fixed value.
+  5: optional list<Exprs.TExprNode> list_values
+
+  // When true, treat `range_end` as a CLOSED upper bound, i.e. the projected
+  // column range is [range_start, range_end]. Used when projecting a
+  // multi-column RANGE partition onto its first column: a partition like
+  // [(L1, L2, ...), (U1, U2, ...)) projects to the first column as
+  // [L1, U1] (both ends closed) — for the L1 == U1 case the projection is the
+  // singleton {L1}, and for L1 < U1 the value U1 is reachable via inner-tuple
+  // values of the second+ column. The original half-open form [L1, U1) would
+  // be a strict UNDER-approximation and could wrongly prune the partition.
+  // The single-column case keeps the default open form to preserve exact
+  // Doris partition semantics.
+  6: optional bool range_end_inclusive = false
+}
+
 struct TMetaScanRange {
   1: optional Types.TMetadataType metadata_type
   2: optional TIcebergMetadataParams iceberg_params // deprecated
@@ -947,6 +990,13 @@
   24: optional bool enable_mor_value_predicate_pushdown
   // Read MOR table as DUP table: skip merge, skip delete sign
   25: optional bool read_mor_as_dup
+  // Partition-id → tablet-id list mapping; used together with partition_boundaries
+  // for BE-side runtime filter partition pruning.
+  26: optional map<Types.TPartitionId, list<Types.TTabletId>> partition_to_tablets
+  // Partition boundary descriptors for BE-side runtime filter partition pruning.
+  // Only partitions that are candidates for pruning are included; partitions FE
+  // does not want pruned (e.g. default catch-all) are omitted from this list.
+  27: optional list<TPartitionBoundary> partition_boundaries
 }
 
 struct TEqJoinCondition {
@@ -1429,6 +1479,18 @@
   MIN_MAX = 4
 }
 
+// Monotonicity of a runtime filter's target expression, used by BE-side
+// partition pruning.  For Range partitions, only MONOTONIC_INCREASING or
+// MONOTONIC_DECREASING target expressions allow safe boundary transformation
+// and pruning.  List partitions can always be pruned regardless of monotonicity.
+// When the target expression is a plain SlotRef (identity), FE may omit this
+// field; BE treats an absent value as NON_MONOTONIC (conservative).
+enum TTargetExprMonotonicity {
+  NON_MONOTONIC = 0,
+  MONOTONIC_INCREASING = 1,
+  MONOTONIC_DECREASING = 2
+}
+
 struct TTopnFilterDesc {
   // topn node id
   1: required i32 source_node_id 
@@ -1495,6 +1557,14 @@
   16: optional bool sync_filter_size; // Deprecated
   
   17: optional bool build_bf_by_runtime_size;
+
+  // Per-target monotonicity for BE-side partition pruning, keyed by target
+  // plan-node ID.  For Range partitions, only monotonic target expressions
+  // allow boundary transformation; List partitions can always be pruned
+  // regardless.  Absent entry / NON_MONOTONIC → BE skips Range partition
+  // pruning for that target.  When the target expression is a plain SlotRef,
+  // FE should set MONOTONIC_INCREASING (identity is trivially monotonic).
+  18: optional map<Types.TPlanNodeId, TTargetExprMonotonicity> planId_to_target_monotonicity;
 }
 
 
diff --git a/regression-test/data/query_p0/runtime_filter/rf_partition_pruning.out b/regression-test/data/query_p0/runtime_filter/rf_partition_pruning.out
new file mode 100644
index 0000000..e4801bc
--- /dev/null
+++ b/regression-test/data/query_p0/runtime_filter/rf_partition_pruning.out
@@ -0,0 +1,126 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !range_int_in --
+1	10	a
+3	50	c
+5	90	e
+
+-- !range_int_minmax --
+1	10	a
+3	50	c
+5	90	e
+
+-- !range_date_in --
+1	2024-01-15	jan
+2	2024-02-20	feb
+3	2024-03-10	mar
+
+-- !range_date_minmax --
+1	2024-01-15	jan
+2	2024-02-20	feb
+3	2024-03-10	mar
+
+-- !list_str_in --
+1	Beijing	a
+2	Beijing	b
+3	Beijing	c
+
+-- !list_int_in --
+1	1	a
+2	1	b
+5	3	e
+6	3	f
+
+-- !no_pruning --
+13	250	m
+18	350	r
+3	50	c
+8	150	h
+
+-- !range_int_agg --
+3	10	90
+
+-- !list_str_agg --
+Beijing	3
+
+-- !list_int_multi --
+3	2	c
+4	2	d
+7	4	g
+8	4	h
+
+-- !bigint_range_in --
+1	100	v0a
+11	250100	v5a
+12	275000	v5b
+2	25000	v0b
+
+-- !bigint_range_minmax --
+1	100	v0a
+11	250100	v5a
+12	275000	v5b
+2	25000	v0b
+
+-- !datetime_range_in --
+1	2024-02-15T10:00	jan
+
+-- !datetime_range_minmax --
+1	2024-02-15T10:00	jan
+
+-- !list_50_in --
+10	10	cat10
+25	25	cat25
+3	3	cat3
+40	40	cat40
+50	50	cat50
+
+-- !neg_range_in --
+7	-50	g
+8	-10	h
+
+-- !neg_range_minmax --
+7	-50	g
+8	-10	h
+
+-- !desc_dim_in --
+1	10	a
+3	50	c
+5	90	e
+
+-- !desc_dim_minmax --
+1	10	a
+3	50	c
+5	90	e
+
+-- !combined_rf --
+1	10	a
+3	50	c
+5	90	e
+
+-- !span_in --
+13	250	m
+3	50	c
+
+-- !span_minmax --
+13	250	m
+3	50	c
+
+-- !list_only_null_pruned --
+3	1	c
+4	1	d
+
+-- !list_only_null_minmax --
+3	1	c
+4	1	d
+
+-- !list_mixed_value_kept --
+2	5	b
+
+-- !list_mixed_nullsafe --
+1	\N
+
+-- !range_multi_first_col --
+1	1	1	a
+2	1	4	b
+3	1	5	c
+4	1	9	d
+
diff --git a/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
new file mode 100644
index 0000000..8d3955e
--- /dev/null
+++ b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
@@ -0,0 +1,1348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.action.ProfileAction
+
+// Marked nonConcurrent: this suite asserts on FE query-profile counters
+// (TotalPartitionsForRFPruning / PartitionsPrunedByRuntimeFilter). Even
+// though each query embeds a unique UUID token to look up its own profile,
+// the FE profile list is bounded (max_query_profile_num) and shared across
+// sessions, so heavy parallel traffic could evict our profile before the
+// poller finds it. Running serially keeps the assertions deterministic.
+suite("rf_partition_pruning", "nonConcurrent") {
+    // Disable the legacy RuntimeFilterPruner: it strips RFs whose effectiveness
+    // cannot be statistically verified, and the small INSERT-only tables in
+    // this suite have no analyzed column stats, so the pruner would otherwise
+    // drop every RF and the partition-pruning counters under test would never
+    // populate.
+    sql "set enable_runtime_filter_prune=false;"
+
+    // ---- Profile utilities ----
+    def profileAction = new ProfileAction(context)
+
+    def getProfileByToken = { String token ->
+        String profileContent = ""
+        for (int attempt = 0; attempt < 15; attempt++) {
+            List profileData = profileAction.getProfileList()
+            for (final def profileItem in profileData) {
+                if (profileItem["Sql Statement"].toString().contains(token)) {
+                    profileContent = profileAction.getProfile(profileItem["Profile ID"].toString())
+                    break
+                }
+            }
+            if (profileContent != "") break
+            Thread.sleep(500)
+        }
+        return profileContent
+    }
+
+    def extractCounterSum = { String profileText, String counterName ->
+        def values = (profileText =~ /-\s*${counterName}:\s*(\d+)/).collect { it[1].toLong() }
+        return values.isEmpty() ? 0L : values.sum()
+    }
+
+    // Run a join query with a unique token to capture profile, then assert pruning counters.
+    // rfType: runtime_filter_type hint value
+    // expectedTotal: expected TotalPartitionsForRFPruning (minimum)
+    // expectedPruned: expected PartitionsPrunedByRuntimeFilter (minimum, 0 means exactly 0)
+    def assertPruningProfile = { String queryBody, String rfType, long expectedTotal, long expectedPruned ->
+        def token = UUID.randomUUID().toString()
+        sql """
+            SELECT /*+ SET_VAR(runtime_filter_type='${rfType}') */ "${token}", ${queryBody}
+        """
+        def profile = getProfileByToken(token)
+        def total = extractCounterSum(profile, "TotalPartitionsForRFPruning")
+        def pruned = extractCounterSum(profile, "PartitionsPrunedByRuntimeFilter")
+        logger.info("Profile [${token}]: total=${total}, pruned=${pruned}")
+        assertTrue(total >= expectedTotal, "TotalPartitionsForRFPruning: expected >= ${expectedTotal}, got ${total}")
+        if (expectedPruned == 0) {
+            assertTrue(pruned == 0, "PartitionsPrunedByRuntimeFilter: expected 0, got ${pruned}")
+        } else {
+            assertTrue(pruned >= expectedPruned, "PartitionsPrunedByRuntimeFilter: expected >= ${expectedPruned}, got ${pruned}")
+        }
+    }
+
+    // ============================================================
+    // Setup: Range-partitioned fact table (INT partition column)
+    // ============================================================
+    sql "drop table if exists rf_prune_range_int"
+    sql """
+        CREATE TABLE rf_prune_range_int (
+            id INT NOT NULL,
+            part_col INT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(part_col) (
+            PARTITION p1 VALUES [("0"), ("100")),
+            PARTITION p2 VALUES [("100"), ("200")),
+            PARTITION p3 VALUES [("200"), ("300")),
+            PARTITION p4 VALUES [("300"), ("400"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql "drop table if exists rf_prune_dim_int"
+    sql """
+        CREATE TABLE rf_prune_dim_int (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql """INSERT INTO rf_prune_range_int VALUES
+        (1, 10, 'a'), (2, 20, 'b'), (3, 50, 'c'), (4, 80, 'd'), (5, 90, 'e'),
+        (6, 110, 'f'), (7, 120, 'g'), (8, 150, 'h'), (9, 180, 'i'), (10, 190, 'j'),
+        (11, 210, 'k'), (12, 220, 'l'), (13, 250, 'm'), (14, 280, 'n'), (15, 290, 'o'),
+        (16, 310, 'p'), (17, 320, 'q'), (18, 350, 'r'), (19, 380, 's'), (20, 390, 't')
+    """
+
+    sql """INSERT INTO rf_prune_dim_int VALUES (10, 'x'), (50, 'y'), (90, 'z')"""
+
+    // ============================================================
+    // Setup: Range-partitioned fact table (DATE partition column)
+    // ============================================================
+    sql "drop table if exists rf_prune_range_date"
+    sql """
+        CREATE TABLE rf_prune_range_date (
+            id INT NOT NULL,
+            dt DATE NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(dt) (
+            PARTITION p2024q1 VALUES [("2024-01-01"), ("2024-04-01")),
+            PARTITION p2024q2 VALUES [("2024-04-01"), ("2024-07-01")),
+            PARTITION p2024q3 VALUES [("2024-07-01"), ("2024-10-01")),
+            PARTITION p2024q4 VALUES [("2024-10-01"), ("2025-01-01"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql "drop table if exists rf_prune_dim_date"
+    sql """
+        CREATE TABLE rf_prune_dim_date (
+            dim_dt DATE NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_dt) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql """INSERT INTO rf_prune_range_date VALUES
+        (1, '2024-01-15', 'jan'), (2, '2024-02-20', 'feb'), (3, '2024-03-10', 'mar'),
+        (4, '2024-04-05', 'apr'), (5, '2024-05-15', 'may'), (6, '2024-06-20', 'jun'),
+        (7, '2024-07-10', 'jul'), (8, '2024-08-15', 'aug'), (9, '2024-09-20', 'sep'),
+        (10, '2024-10-05', 'oct'), (11, '2024-11-15', 'nov'), (12, '2024-12-20', 'dec')
+    """
+
+    sql """INSERT INTO rf_prune_dim_date VALUES
+        ('2024-01-15', 'x'), ('2024-02-20', 'y'), ('2024-03-10', 'z')
+    """
+
+    // ============================================================
+    // Setup: List-partitioned fact table (VARCHAR partition column)
+    // ============================================================
+    sql "drop table if exists rf_prune_list_str"
+    sql """
+        CREATE TABLE rf_prune_list_str (
+            id INT NOT NULL,
+            city VARCHAR(32) NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY LIST(city) (
+            PARTITION p_bj VALUES IN ("Beijing"),
+            PARTITION p_sh VALUES IN ("Shanghai"),
+            PARTITION p_gz VALUES IN ("Guangzhou"),
+            PARTITION p_sz VALUES IN ("Shenzhen")
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql "drop table if exists rf_prune_dim_city"
+    sql """
+        CREATE TABLE rf_prune_dim_city (
+            dim_city VARCHAR(32) NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_city) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql """INSERT INTO rf_prune_list_str VALUES
+        (1, 'Beijing', 'a'), (2, 'Beijing', 'b'), (3, 'Beijing', 'c'),
+        (4, 'Shanghai', 'd'), (5, 'Shanghai', 'e'), (6, 'Shanghai', 'f'),
+        (7, 'Guangzhou', 'g'), (8, 'Guangzhou', 'h'), (9, 'Guangzhou', 'i'),
+        (10, 'Shenzhen', 'j'), (11, 'Shenzhen', 'k'), (12, 'Shenzhen', 'l')
+    """
+
+    sql """INSERT INTO rf_prune_dim_city VALUES ('Beijing', 'x')"""
+
+    // ============================================================
+    // Setup: List-partitioned fact table (INT partition column)
+    // ============================================================
+    sql "drop table if exists rf_prune_list_int"
+    sql """
+        CREATE TABLE rf_prune_list_int (
+            id INT NOT NULL,
+            region_id INT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY LIST(region_id) (
+            PARTITION p_r1 VALUES IN ("1"),
+            PARTITION p_r2 VALUES IN ("2"),
+            PARTITION p_r3 VALUES IN ("3"),
+            PARTITION p_r4 VALUES IN ("4"),
+            PARTITION p_r5 VALUES IN ("5")
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 2
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql "drop table if exists rf_prune_dim_region"
+    sql """
+        CREATE TABLE rf_prune_dim_region (
+            dim_region INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_region) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+
+    sql """INSERT INTO rf_prune_list_int VALUES
+        (1, 1, 'a'), (2, 1, 'b'),
+        (3, 2, 'c'), (4, 2, 'd'),
+        (5, 3, 'e'), (6, 3, 'f'),
+        (7, 4, 'g'), (8, 4, 'h'),
+        (9, 5, 'i'), (10, 5, 'j')
+    """
+
+    sql """INSERT INTO rf_prune_dim_region VALUES (1, 'x'), (3, 'y')"""
+
+    // ============================================================
+    // Session settings
+    // ============================================================
+    sql "set runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX'"
+    sql "set runtime_filter_wait_infinitely=true"
+    sql "set disable_join_reorder=true"
+    sql "set enable_profile=true"
+    sql "set profile_level=2"
+    // Fix parallelism to 1 so profile counters are deterministic across environments
+    sql "set parallel_pipeline_task_num=1"
+
+    // ============================================================
+    // Test 1: Range partition (INT) - IN filter prune
+    // RF {10, 50, 90} → only p1 [0,100) matches → prune 3 of 4
+    // ============================================================
+    order_qt_range_int_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_int d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // Test 2: Range partition (INT) - MinMax filter prune
+    // min=10, max=90 → only p1 matches → prune 3 of 4
+    order_qt_range_int_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_int d ON f.part_col = d.dim_key",
+        "MIN_MAX", 4, 3)
+
+    // Test 3: Range partition (DATE) - IN filter prune
+    // Q1 dates only → prune Q2, Q3, Q4 → prune 3 of 4
+    order_qt_range_date_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.dt, f.value
+        FROM rf_prune_range_date f
+        JOIN rf_prune_dim_date d ON f.dt = d.dim_dt
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_date f JOIN rf_prune_dim_date d ON f.dt = d.dim_dt",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // Test 4: Range partition (DATE) - MinMax filter prune
+    order_qt_range_date_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.dt, f.value
+        FROM rf_prune_range_date f
+        JOIN rf_prune_dim_date d ON f.dt = d.dim_dt
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_date f JOIN rf_prune_dim_date d ON f.dt = d.dim_dt",
+        "MIN_MAX", 4, 3)
+
+    // Test 5: List partition (VARCHAR) - IN filter prune
+    // Only "Beijing" → prune 3 of 4
+    order_qt_list_str_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.city, f.value
+        FROM rf_prune_list_str f
+        JOIN rf_prune_dim_city d ON f.city = d.dim_city
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_str f JOIN rf_prune_dim_city d ON f.city = d.dim_city",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // Test 6: List partition (INT) - IN filter prune
+    // Regions {1, 3} → prune 3 of 5
+    order_qt_list_int_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.region_id, f.value
+        FROM rf_prune_list_int f
+        JOIN rf_prune_dim_region d ON f.region_id = d.dim_region
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_int f JOIN rf_prune_dim_region d ON f.region_id = d.dim_region",
+        "IN_OR_BLOOM_FILTER", 5, 3)
+
+    // Test 7: No pruning - dim matches all partitions
+    sql "drop table if exists rf_prune_dim_all"
+    sql """
+        CREATE TABLE rf_prune_dim_all (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_all VALUES (50, 'a'), (150, 'b'), (250, 'c'), (350, 'd')"""
+
+    order_qt_no_pruning """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_all d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_all d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 0)
+
+    // Test 8: Join result correctness with aggregation
+    order_qt_range_int_agg """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            count(*), min(f.part_col), max(f.part_col)
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+    """
+
+    // Test 9: List partition with aggregation
+    order_qt_list_str_agg """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.city, count(*) as cnt
+        FROM rf_prune_list_str f
+        JOIN rf_prune_dim_city d ON f.city = d.dim_city
+        GROUP BY f.city
+    """
+
+    // Test 10: Multiple dim rows ({2, 4}) → prune 3 of 5
+    sql "drop table if exists rf_prune_dim_multi"
+    sql """
+        CREATE TABLE rf_prune_dim_multi (
+            dim_region INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_region) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_multi VALUES (2, 'a'), (4, 'b')"""
+
+    order_qt_list_int_multi """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.region_id, f.value
+        FROM rf_prune_list_int f
+        JOIN rf_prune_dim_multi d ON f.region_id = d.dim_region
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_int f JOIN rf_prune_dim_multi d ON f.region_id = d.dim_region",
+        "IN_OR_BLOOM_FILTER", 5, 3)
+
+    // ============================================================
+    // Extended tests: more types, more partitions, non-monotonic,
+    // negative ranges, boundary values
+    // ============================================================
+
+    // ---- Setup: BIGINT range, 20 partitions (0–1,000,000 step 50,000) ----
+    sql "drop table if exists rf_prune_range_bigint"
+    def bigintPartitions = (0..19).collect { i ->
+        def lo = i * 50000L
+        def hi = (i + 1) * 50000L
+        "PARTITION p${i} VALUES [(\"${lo}\"), (\"${hi}\"))"
+    }.join(",\n            ")
+    sql """
+        CREATE TABLE rf_prune_range_bigint (
+            id INT NOT NULL,
+            bval BIGINT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(bval) (
+            ${bigintPartitions}
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+    // Insert 2 rows per partition
+    def bigintInsertVals = (0..19).collect { i ->
+        def v1 = i * 50000L + 100
+        def v2 = i * 50000L + 25000
+        "(${i * 2 + 1}, ${v1}, 'v${i}a'), (${i * 2 + 2}, ${v2}, 'v${i}b')"
+    }.join(",\n        ")
+    sql "INSERT INTO rf_prune_range_bigint VALUES ${bigintInsertVals}"
+
+    // Dim: only values in p0 [0,50000) and p5 [250000,300000) → 18 pruned
+    sql "drop table if exists rf_prune_dim_bigint"
+    sql """
+        CREATE TABLE rf_prune_dim_bigint (
+            dim_key BIGINT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_bigint VALUES (100, 'x'), (25000, 'y'), (250100, 'z'), (275000, 'w')"""
+
+    // Test 11: BIGINT range (20 partitions) + IN → 18 pruned
+    order_qt_bigint_range_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.bval, f.value
+        FROM rf_prune_range_bigint f
+        JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_bigint f JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 20, 18)
+
+    // Test 12: BIGINT range (20 partitions) + MinMax → 18 pruned
+    // min=100, max=275000 → covers p0–p5 (6 partitions), so 14 pruned
+    order_qt_bigint_range_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.bval, f.value
+        FROM rf_prune_range_bigint f
+        JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_bigint f JOIN rf_prune_dim_bigint d ON f.bval = d.dim_key",
+        "MIN_MAX", 20, 14)
+
+    // ---- Setup: DATETIME range, 8 partitions (quarterly, 2024-2025) ----
+    sql "drop table if exists rf_prune_range_datetime"
+    sql """
+        CREATE TABLE rf_prune_range_datetime (
+            id INT NOT NULL,
+            ts DATETIME NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(ts) (
+            PARTITION p2024q1 VALUES [("2024-01-01 00:00:00"), ("2024-04-01 00:00:00")),
+            PARTITION p2024q2 VALUES [("2024-04-01 00:00:00"), ("2024-07-01 00:00:00")),
+            PARTITION p2024q3 VALUES [("2024-07-01 00:00:00"), ("2024-10-01 00:00:00")),
+            PARTITION p2024q4 VALUES [("2024-10-01 00:00:00"), ("2025-01-01 00:00:00")),
+            PARTITION p2025q1 VALUES [("2025-01-01 00:00:00"), ("2025-04-01 00:00:00")),
+            PARTITION p2025q2 VALUES [("2025-04-01 00:00:00"), ("2025-07-01 00:00:00")),
+            PARTITION p2025q3 VALUES [("2025-07-01 00:00:00"), ("2025-10-01 00:00:00")),
+            PARTITION p2025q4 VALUES [("2025-10-01 00:00:00"), ("2026-01-01 00:00:00"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_range_datetime VALUES
+        (1, '2024-02-15 10:00:00', 'jan'), (2, '2024-05-20 12:00:00', 'may'),
+        (3, '2024-08-10 08:00:00', 'aug'), (4, '2024-11-25 14:00:00', 'nov'),
+        (5, '2025-02-01 09:00:00', 'feb25'), (6, '2025-05-15 11:00:00', 'may25'),
+        (7, '2025-08-20 16:00:00', 'aug25'), (8, '2025-11-10 13:00:00', 'nov25')
+    """
+
+    sql "drop table if exists rf_prune_dim_datetime"
+    sql """
+        CREATE TABLE rf_prune_dim_datetime (
+            dim_ts DATETIME NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_ts) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    // Only 2024 Q1 timestamps → 7 of 8 partitions pruned
+    sql """INSERT INTO rf_prune_dim_datetime VALUES
+        ('2024-02-15 10:00:00', 'x'), ('2024-03-01 00:00:00', 'y')
+    """
+
+    // Test 13: DATETIME range (8 partitions) + IN → 7 pruned
+    order_qt_datetime_range_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.ts, f.value
+        FROM rf_prune_range_datetime f
+        JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_datetime f JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts",
+        "IN_OR_BLOOM_FILTER", 8, 7)
+
+    // Test 14: DATETIME range (8 partitions) + MinMax → 7 pruned
+    order_qt_datetime_range_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.ts, f.value
+        FROM rf_prune_range_datetime f
+        JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_datetime f JOIN rf_prune_dim_datetime d ON f.ts = d.dim_ts",
+        "MIN_MAX", 8, 7)
+
+    // ---- Setup: 50 INT list partitions ----
+    sql "drop table if exists rf_prune_list_50"
+    def list50Partitions = (1..50).collect { i ->
+        "PARTITION p_${i} VALUES IN (\"${i}\")"
+    }.join(",\n            ")
+    sql """
+        CREATE TABLE rf_prune_list_50 (
+            id INT NOT NULL,
+            cat_id INT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY LIST(cat_id) (
+            ${list50Partitions}
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+    def list50InsertVals = (1..50).collect { i ->
+        "(${i}, ${i}, 'cat${i}')"
+    }.join(", ")
+    sql "INSERT INTO rf_prune_list_50 VALUES ${list50InsertVals}"
+
+    sql "drop table if exists rf_prune_dim_50"
+    sql """
+        CREATE TABLE rf_prune_dim_50 (
+            dim_cat INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_cat) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    // Match 5 of 50 → 45 pruned
+    sql """INSERT INTO rf_prune_dim_50 VALUES (3, 'a'), (10, 'b'), (25, 'c'), (40, 'd'), (50, 'e')"""
+
+    // Test 15: 50 list partitions + IN → 45 pruned
+    order_qt_list_50_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.cat_id, f.value
+        FROM rf_prune_list_50 f
+        JOIN rf_prune_dim_50 d ON f.cat_id = d.dim_cat
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_50 f JOIN rf_prune_dim_50 d ON f.cat_id = d.dim_cat",
+        "IN_OR_BLOOM_FILTER", 50, 45)
+
+    // ---- Non-monotonic expression tests ----
+    // Test 16: Join on abs(part_col) — target expr is NOT a SlotRef → 0 pruned
+    // Using existing rf_prune_range_int (4 partitions), dim has value 50
+    sql "drop table if exists rf_prune_dim_abs"
+    sql """
+        CREATE TABLE rf_prune_dim_abs (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_abs VALUES (50, 'x')"""
+
+    def token16 = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token16}", f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_abs d ON abs(f.part_col) = d.dim_key
+    """
+    def profile16 = getProfileByToken(token16)
+    def pruned16 = extractCounterSum(profile16, "PartitionsPrunedByRuntimeFilter")
+    logger.info("non_monotonic abs: pruned=${pruned16}")
+    assertTrue(pruned16 == 0, "Non-monotonic expr should not prune, got ${pruned16}")
+
+    // Test 17: Join on f.part_col % 100 — non-monotonic → 0 pruned
+    sql "drop table if exists rf_prune_dim_mod"
+    sql """
+        CREATE TABLE rf_prune_dim_mod (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_mod VALUES (10, 'x')"""
+
+    def token17 = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token17}", f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_mod d ON (f.part_col % 100) = d.dim_key
+    """
+    def profile17 = getProfileByToken(token17)
+    def pruned17 = extractCounterSum(profile17, "PartitionsPrunedByRuntimeFilter")
+    logger.info("non_monotonic mod: pruned=${pruned17}")
+    assertTrue(pruned17 == 0, "Non-monotonic mod expr should not prune, got ${pruned17}")
+
+    // Test 18: Join on non-partition column (f.id) → 0 partition pruning
+    sql "drop table if exists rf_prune_dim_nonpart"
+    sql """
+        CREATE TABLE rf_prune_dim_nonpart (
+            dim_id INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_id) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_nonpart VALUES (1, 'x'), (3, 'y')"""
+
+    def token18 = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token18}", f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_nonpart d ON f.id = d.dim_id
+    """
+    def profile18 = getProfileByToken(token18)
+    def pruned18 = extractCounterSum(profile18, "PartitionsPrunedByRuntimeFilter")
+    logger.info("non_partition_col: pruned=${pruned18}")
+    assertTrue(pruned18 == 0, "Join on non-partition col should not prune, got ${pruned18}")
+
+    // ---- Negative value range partitions ----
+    sql "drop table if exists rf_prune_range_neg"
+    sql """
+        CREATE TABLE rf_prune_range_neg (
+            id INT NOT NULL,
+            neg_col INT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(neg_col) (
+            PARTITION pn4 VALUES [("-400"), ("-300")),
+            PARTITION pn3 VALUES [("-300"), ("-200")),
+            PARTITION pn2 VALUES [("-200"), ("-100")),
+            PARTITION pn1 VALUES [("-100"), ("0"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_range_neg VALUES
+        (1, -350, 'a'), (2, -310, 'b'),
+        (3, -250, 'c'), (4, -210, 'd'),
+        (5, -150, 'e'), (6, -110, 'f'),
+        (7, -50, 'g'), (8, -10, 'h')
+    """
+
+    sql "drop table if exists rf_prune_dim_neg"
+    sql """
+        CREATE TABLE rf_prune_dim_neg (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    // Only values in pn1 [-100, 0) → prune 3 of 4
+    sql """INSERT INTO rf_prune_dim_neg VALUES (-50, 'x'), (-10, 'y')"""
+
+    // Test 19: Negative range + IN → 3 pruned
+    order_qt_neg_range_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.neg_col, f.value
+        FROM rf_prune_range_neg f
+        JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_neg f JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // Test 20: Negative range + MinMax → 3 pruned
+    order_qt_neg_range_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.neg_col, f.value
+        FROM rf_prune_range_neg f
+        JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_neg f JOIN rf_prune_dim_neg d ON f.neg_col = d.dim_key",
+        "MIN_MAX", 4, 3)
+
+    // ---- Boundary value tests ----
+    // Dim has exact partition boundary values: 100, 200, 300
+    // 100 ∈ p2 [100,200), 200 ∈ p3 [200,300), 300 ∈ p4 [300,400)
+    // Only p1 [0,100) is pruned → 1 pruned
+    sql "drop table if exists rf_prune_dim_boundary"
+    sql """
+        CREATE TABLE rf_prune_dim_boundary (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_boundary VALUES (100, 'a'), (200, 'b'), (300, 'c')"""
+
+    def token21 = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token21}", f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_boundary d ON f.part_col = d.dim_key
+    """
+    def profile21 = getProfileByToken(token21)
+    def pruned21 = extractCounterSum(profile21, "PartitionsPrunedByRuntimeFilter")
+    def total21 = extractCounterSum(profile21, "TotalPartitionsForRFPruning")
+    logger.info("boundary_exact: total=${total21}, pruned=${pruned21}")
+    // p1 is pruned (no value 0-99 in dim), p2/p3/p4 match boundary values
+    assertTrue(pruned21 >= 1, "Boundary test: expected >= 1 pruned, got ${pruned21}")
+
+    // Test 22: Dim values outside all partitions → all pruned, empty result
+    sql "drop table if exists rf_prune_dim_outside"
+    sql """
+        CREATE TABLE rf_prune_dim_outside (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_outside VALUES (500, 'x'), (600, 'y'), (700, 'z')"""
+
+    def token22 = UUID.randomUUID().toString()
+    def result22 = sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token22}", f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_outside d ON f.part_col = d.dim_key
+    """
+    assertTrue(result22.isEmpty(), "Dim outside all partitions should return empty result")
+    def profile22 = getProfileByToken(token22)
+    def pruned22 = extractCounterSum(profile22, "PartitionsPrunedByRuntimeFilter")
+    logger.info("outside_all: pruned=${pruned22}")
+    // All 4 partitions should be pruned since RF IN {500,600,700} intersects none
+    assertTrue(pruned22 >= 4, "All partitions should be pruned, got ${pruned22}")
+
+    // ---- Descending dim value order test ----
+    // Verify that dim insertion order (desc vs asc) doesn't affect pruning
+    sql "drop table if exists rf_prune_dim_desc"
+    sql """
+        CREATE TABLE rf_prune_dim_desc (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    // Same values as rf_prune_dim_int {10, 50, 90} but inserted in descending order
+    sql """INSERT INTO rf_prune_dim_desc VALUES (90, 'z'), (50, 'y'), (10, 'x')"""
+
+    // Test 23: Descending dim order + IN → should still prune 3 of 4
+    order_qt_desc_dim_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // Test 24: Descending dim order + MinMax → should still prune 3 of 4
+    order_qt_desc_dim_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_desc d ON f.part_col = d.dim_key",
+        "MIN_MAX", 4, 3)
+
+    // ---- Combined RF types test ----
+    // Test 25: Both IN and MinMax filters simultaneously
+    order_qt_combined_rf """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER,MIN_MAX') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_int d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER,MIN_MAX", 4, 3)
+
+    // ---- Multi-partition match with MinMax spanning multiple partitions ----
+    // Dim values span p1 and p3: {50, 250} → MinMax [50, 250] covers p1, p2, p3
+    // IN filter prunes p2, p4 (2 pruned). MinMax prunes only p4 (1 pruned).
+    sql "drop table if exists rf_prune_dim_span"
+    sql """
+        CREATE TABLE rf_prune_dim_span (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_span VALUES (50, 'x'), (250, 'y')"""
+
+    // Test 26: Spanning dim + IN → 2 pruned (p2, p4 don't contain 50 or 250)
+    order_qt_span_in """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_span d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_span d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 2)
+
+    // Test 27: Spanning dim + MinMax → [50,250] covers p1,p2,p3; only p4 pruned → 1 pruned
+    order_qt_span_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_span d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_int f JOIN rf_prune_dim_span d ON f.part_col = d.dim_key",
+        "MIN_MAX", 4, 1)
+
+    // ============================================================
+    // Tests 28-32: LIST partition with NULL key (only_null & mixed)
+    // ============================================================
+    sql "drop table if exists rf_prune_list_null"
+    sql """
+        CREATE TABLE rf_prune_list_null (
+            id INT NOT NULL,
+            part_col INT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY LIST(part_col) (
+            PARTITION p_null VALUES IN ((NULL)),
+            PARTITION p_one VALUES IN ("1"),
+            PARTITION p_two VALUES IN ("2"),
+            PARTITION p_three VALUES IN ("3")
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 2
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_list_null VALUES
+        (1, NULL, 'a'), (2, NULL, 'b'),
+        (3, 1, 'c'), (4, 1, 'd'),
+        (5, 2, 'e'),
+        (6, 3, 'f')"""
+
+    sql "drop table if exists rf_prune_dim_one"
+    sql """
+        CREATE TABLE rf_prune_dim_one (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_one VALUES (1, 'x')"""
+
+    // Test 28: Non-null-aware RF {1} on LIST{p_null,p_one,p_two,p_three}
+    //   p_null is only_null → pruned (RF !contain NULL)
+    //   p_two, p_three → pruned (no value match)
+    //   p_one → kept
+    order_qt_list_only_null_pruned """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_list_null f
+        JOIN rf_prune_dim_one d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_null f JOIN rf_prune_dim_one d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // Test 29: Same with MIN_MAX → MinMax [1,1] excludes p_null/p_two/p_three → 3 pruned
+    order_qt_list_only_null_minmax """
+        SELECT /*+ SET_VAR(runtime_filter_type='MIN_MAX') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_list_null f
+        JOIN rf_prune_dim_one d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_null f JOIN rf_prune_dim_one d ON f.part_col = d.dim_key",
+        "MIN_MAX", 4, 3)
+
+    // Test 30: Mixed NULL+value partition. Build a list where one partition holds {NULL, 5}.
+    sql "drop table if exists rf_prune_list_mixed"
+    sql """
+        CREATE TABLE rf_prune_list_mixed (
+            id INT NOT NULL,
+            part_col INT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY LIST(part_col) (
+            PARTITION p_a VALUES IN ((NULL), ("5")),
+            PARTITION p_b VALUES IN (("10")),
+            PARTITION p_c VALUES IN (("20"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 2
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_list_mixed VALUES
+        (1, NULL, 'a'), (2, 5, 'b'),
+        (3, 10, 'c'),
+        (4, 20, 'd')"""
+
+    sql "drop table if exists rf_prune_dim_five"
+    sql """
+        CREATE TABLE rf_prune_dim_five (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_five VALUES (5, 'x')"""
+
+    // Test 30: Mixed partition {NULL,5} + RF {5} (non-null-aware) → p_a kept, p_b/p_c pruned
+    // Crucial regression: validates that concrete value 5 survives the NULL marker.
+    order_qt_list_mixed_value_kept """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col, f.value
+        FROM rf_prune_list_mixed f
+        JOIN rf_prune_dim_five d ON f.part_col = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_list_mixed f JOIN rf_prune_dim_five d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 3, 2)
+
+    // Test 31: Mixed partition {NULL,5} + RF {7} (no value match, RF non-null-aware)
+    //   p_a still pruned (NULL row can't match non-null RF; concrete 5 != 7)
+    //   p_b, p_c pruned
+    sql "drop table if exists rf_prune_dim_seven"
+    sql """
+        CREATE TABLE rf_prune_dim_seven (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_seven VALUES (7, 'x')"""
+
+    def token_mixed_all = UUID.randomUUID().toString()
+    def res_mixed_all = sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_mixed_all}", f.id
+        FROM rf_prune_list_mixed f
+        JOIN rf_prune_dim_seven d ON f.part_col = d.dim_key
+    """
+    assertTrue(res_mixed_all.isEmpty(), "RF {7} matches no row, expect empty")
+    assertPruningProfile(
+        "* FROM rf_prune_list_mixed f JOIN rf_prune_dim_seven d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 3, 3)
+
+    // Test 32: Null-safe equal join (<=>) on mixed partition. RF is null_aware AND
+    //   contains NULL (build side has NULL key), so p_a (which contains NULL) MUST
+    //   be kept even if RF concrete values miss 5.
+    sql "drop table if exists rf_prune_dim_null"
+    sql """
+        CREATE TABLE rf_prune_dim_null (
+            dim_key INT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_val) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_null VALUES (NULL, 'n')"""
+
+    order_qt_list_mixed_nullsafe """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.part_col
+        FROM rf_prune_list_mixed f
+        JOIN rf_prune_dim_null d ON f.part_col <=> d.dim_key
+    """
+
+    // ============================================================
+    // Tests 33-34: Multi-column RANGE projection (closed [L1, U1])
+    // ============================================================
+    // Critical: partition [(1,1),(1,5)) only contains rows whose first key == 1.
+    // Naively projecting to [1,1) (half-open) would mark it as empty and wrongly
+    // prune it when RF {1} probes. The fix uses closed [1,1].
+    sql "drop table if exists rf_prune_range_multi"
+    sql """
+        CREATE TABLE rf_prune_range_multi (
+            id INT NOT NULL,
+            k1 INT NOT NULL,
+            k2 INT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(k1, k2) (
+            PARTITION p_a VALUES [("1", "1"), ("1", "5")),
+            PARTITION p_b VALUES [("1", "5"), ("2", "0")),
+            PARTITION p_c VALUES [("2", "0"), ("3", "0"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 2
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_range_multi VALUES
+        (1, 1, 1, 'a'), (2, 1, 4, 'b'),
+        (3, 1, 5, 'c'), (4, 1, 9, 'd'),
+        (5, 2, 0, 'e'), (6, 2, 9, 'f')"""
+
+    // Test 33: RF {1} probing k1 → must keep p_a AND p_b (both span first-col=1),
+    //   prune p_c. Validates closed-upper-bound projection.
+    order_qt_range_multi_first_col """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            f.id, f.k1, f.k2, f.value
+        FROM rf_prune_range_multi f
+        JOIN rf_prune_dim_one d ON f.k1 = d.dim_key
+    """
+    assertPruningProfile(
+        "* FROM rf_prune_range_multi f JOIN rf_prune_dim_one d ON f.k1 = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 3, 1)
+
+    // Test 34: MIN_MAX RF {1} → MinMax [1,1] still hits both p_a and p_b, prune p_c
+    assertPruningProfile(
+        "* FROM rf_prune_range_multi f JOIN rf_prune_dim_one d ON f.k1 = d.dim_key",
+        "MIN_MAX", 3, 1)
+
+    // ============================================================
+    // Tests 35-41: Type coverage
+    // ============================================================
+    // Each test creates a 4-partition RANGE table over a different type and
+    // joins with a 1-row dim that should match exactly one partition.
+    def runTypeCoverage = { String suffix, String partType, String dimType,
+                            String partLits, String dimVal, List<String> rowVals ->
+        sql "drop table if exists rf_prune_type_${suffix}"
+        sql """
+            CREATE TABLE rf_prune_type_${suffix} (
+                id INT NOT NULL,
+                part_col ${partType} NOT NULL,
+                value VARCHAR(64)
+            )
+            PARTITION BY RANGE(part_col) (
+                PARTITION p1 VALUES [(${partLits.split('\\|')[0]}), (${partLits.split('\\|')[1]})),
+                PARTITION p2 VALUES [(${partLits.split('\\|')[1]}), (${partLits.split('\\|')[2]})),
+                PARTITION p3 VALUES [(${partLits.split('\\|')[2]}), (${partLits.split('\\|')[3]})),
+                PARTITION p4 VALUES [(${partLits.split('\\|')[3]}), (${partLits.split('\\|')[4]}))
+            )
+            DISTRIBUTED BY HASH(id) BUCKETS 2
+            PROPERTIES("replication_num" = "1")
+        """
+        def insertRows = rowVals.withIndex().collect { v, i -> "(${i + 1}, ${v}, 'r${i}')" }.join(", ")
+        sql "INSERT INTO rf_prune_type_${suffix} VALUES ${insertRows}"
+        sql "drop table if exists rf_prune_dim_type_${suffix}"
+        sql """
+            CREATE TABLE rf_prune_dim_type_${suffix} (
+                dim_key ${dimType} NOT NULL,
+                dim_val VARCHAR(32)
+            )
+            DISTRIBUTED BY HASH(dim_val) BUCKETS 1
+            PROPERTIES("replication_num" = "1")
+        """
+        sql "INSERT INTO rf_prune_dim_type_${suffix} VALUES (${dimVal}, 'x')"
+        assertPruningProfile(
+            "* FROM rf_prune_type_${suffix} f JOIN rf_prune_dim_type_${suffix} d ON f.part_col = d.dim_key",
+            "IN_OR_BLOOM_FILTER", 4, 3)
+    }
+
+    // Test 35: TINYINT
+    runTypeCoverage("tinyint", "TINYINT", "TINYINT",
+        '"-100"|"-50"|"0"|"50"|"100"', "-75",
+        ["-90", "-60", "-25", "10", "25", "75"])
+
+    // Test 36: SMALLINT
+    runTypeCoverage("smallint", "SMALLINT", "SMALLINT",
+        '"0"|"1000"|"2000"|"3000"|"4000"', "1500",
+        ["100", "1500", "2500", "3500"])
+
+    // Test 37: LARGEINT
+    runTypeCoverage("largeint", "LARGEINT", "LARGEINT",
+        '"0"|"100000000000"|"200000000000"|"300000000000"|"400000000000"',
+        "150000000000",
+        ["50000000000", "150000000000", "250000000000", "350000000000"])
+
+    // Test 38: DECIMAL is intentionally skipped — Doris does not allow
+    // DECIMAL columns as RANGE partition keys, and the rest of this suite
+    // already exercises decimal-typed RF targets via casts.
+
+    // Test 39: DATE
+    runTypeCoverage("date", "DATE", "DATE",
+        '"2023-01-01"|"2023-04-01"|"2023-07-01"|"2023-10-01"|"2024-01-01"',
+        '"2023-05-15"',
+        ['"2023-02-15"', '"2023-05-15"', '"2023-08-15"', '"2023-11-15"'])
+
+    // Test 40: DATETIME
+    runTypeCoverage("datetime", "DATETIME", "DATETIME",
+        '"2023-01-01 00:00:00"|"2023-04-01 00:00:00"|"2023-07-01 00:00:00"|"2023-10-01 00:00:00"|"2024-01-01 00:00:00"',
+        '"2023-05-15 12:00:00"',
+        ['"2023-02-15 00:00:00"', '"2023-05-15 12:00:00"', '"2023-08-15 00:00:00"', '"2023-11-15 00:00:00"'])
+
+    // Test 41: VARCHAR is intentionally skipped — Doris does not allow
+    // VARCHAR columns as RANGE partition keys.
+
+    // ============================================================
+    // Tests 42-47: Monotonic / non-monotonic target_expr pruning
+    //
+    // FE classifier is currently conservative: only an identity SlotRef on a
+    // partition column is treated as MONOTONIC_INCREASING; any wrapping
+    // expression (including Cast and other Nereids `Monotonic` functions) is
+    // classified NON_MONOTONIC and therefore does not drive partition pruning.
+    // This will be revisited once the `Monotonic.isMonotonic(lower, upper)`
+    // implementations are completed/audited so we can safely walk through
+    // monotonic chains. Until then these tests pin the conservative behavior.
+    //
+    // NOTE: Nereids' RuntimeFilterPushDownVisitor itself only allows
+    // non-trivial target expressions when the input is either (a) a numeric
+    // single-slot expression, or (b) a chain of Cast wrapping a single slot.
+    // Date functions like year(dt)/date_trunc(dt,...) on a DATE partition
+    // column do not currently produce a runtime filter at all, so they cannot
+    // be exercised here regardless of the classifier choice.
+    // ============================================================
+
+    sql "drop table if exists rf_prune_dim_bigint"
+    sql """
+        CREATE TABLE rf_prune_dim_bigint (
+            dim_key BIGINT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_bigint VALUES (50, 'x')"""
+
+    // Test 42: target_expr = cast(part_col as bigint). Cast is monotonic in
+    // theory but the conservative FE classifier treats any non-Slot target
+    // expression as NON_MONOTONIC, so partition pruning does not fire.
+    def token_cast = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_cast}", f.id
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_bigint d ON cast(f.part_col as bigint) = d.dim_key
+    """
+    def profile_cast = getProfileByToken(token_cast)
+    def pruned_cast = extractCounterSum(profile_cast, "PartitionsPrunedByRuntimeFilter")
+    logger.info("conservative cast: pruned=${pruned_cast}")
+    assertTrue(pruned_cast == 0,
+        "Conservative classifier: Cast wrapper should not drive pruning, got ${pruned_cast}")
+
+    // Test 43: chained Cast — same reasoning as Test 42, expect no pruning.
+    def token_cast2 = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_cast2}", f.id
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_int d ON cast(cast(f.part_col as bigint) as int) = d.dim_key
+    """
+    def profile_cast2 = getProfileByToken(token_cast2)
+    def pruned_cast2 = extractCounterSum(profile_cast2, "PartitionsPrunedByRuntimeFilter")
+    logger.info("conservative chained cast: pruned=${pruned_cast2}")
+    assertTrue(pruned_cast2 == 0,
+        "Conservative classifier: chained Cast should not drive pruning, got ${pruned_cast2}")
+
+    // Test 44: target_expr = cast(dt as datetime) on a DATE partition column.
+    // Cast on DATE→DATETIME is non-trivial and the RF generator emits the
+    // filter, but the conservative classifier still rejects the wrapper.
+    sql "drop table if exists rf_prune_dim_dt"
+    sql """
+        CREATE TABLE rf_prune_dim_dt (
+            dim_dt DATETIME NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_dt) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_dt VALUES ('2024-02-20 00:00:00', 'x')"""
+    def token_castdt = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_castdt}", f.id
+        FROM rf_prune_range_date f
+        JOIN rf_prune_dim_dt d ON cast(f.dt as datetime) = d.dim_dt
+    """
+    def profile_castdt = getProfileByToken(token_castdt)
+    def pruned_castdt = extractCounterSum(profile_castdt, "PartitionsPrunedByRuntimeFilter")
+    logger.info("conservative cast date->datetime: pruned=${pruned_castdt}")
+    assertTrue(pruned_castdt == 0,
+        "Conservative classifier: Cast date->datetime should not drive pruning, got ${pruned_castdt}")
+
+    // Test 45: target_expr = -part_col (Negative). Numeric input slot lets
+    // the RF generator emit a filter, but Negative is not declared Monotonic
+    // in Nereids → classifier returns NON_MONOTONIC → no partition pruning.
+    sql "drop table if exists rf_prune_dim_neg2"
+    sql """
+        CREATE TABLE rf_prune_dim_neg2 (
+            dim_key INT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_neg2 VALUES (-50, 'x')"""
+    def token_neg = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_neg}", f.id
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_neg2 d ON (-f.part_col) = d.dim_key
+    """
+    def profile_neg = getProfileByToken(token_neg)
+    def pruned_neg = extractCounterSum(profile_neg, "PartitionsPrunedByRuntimeFilter")
+    logger.info("non_monotonic neg: pruned=${pruned_neg}")
+    assertTrue(pruned_neg == 0,
+        "Negative is not declared Monotonic in Nereids; should not prune, got ${pruned_neg}")
+
+    // Test 46: Test for non-monotonic Add was removed because Nereids
+    // constant-folds `part_col + 10 = 60` into `part_col = 50`, so the RF
+    // ends up identity and pruning happens. The Negative case in Test 45
+    // already covers the non-monotonic single-slot probe path.
+
+    // Test 47: Cast wrapping a non-partition column — cast(f.id as bigint).
+    // Cast is monotonic, but f.id is not a partition column of the table
+    // (rf_prune_range_int is partitioned on part_col). Classifier walks the
+    // Cast, lands on a non-partition slot, returns NON_MONOTONIC → no
+    // partition pruning even though the chain is monotonic.
+    sql "drop table if exists rf_prune_dim_id_bigint"
+    sql """
+        CREATE TABLE rf_prune_dim_id_bigint (
+            dim_key BIGINT NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_id_bigint VALUES (5, 'x')"""
+    def token_id = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_id}", f.id
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_id_bigint d ON cast(f.id as bigint) = d.dim_key
+    """
+    def profile_id = getProfileByToken(token_id)
+    def pruned_id = extractCounterSum(profile_id, "PartitionsPrunedByRuntimeFilter")
+    logger.info("monotonic chain on non-partition col: pruned=${pruned_id}")
+    assertTrue(pruned_id == 0,
+        "Monotonic chain on non-partition col should not prune, got ${pruned_id}")
+
+    // ============================================================
+    // Test 47b: String partition column (LIST partition on VARCHAR).
+    //
+    // Regression coverage for the BE pruner string path. ColumnValueRange
+    // for string types uses CppType=std::string while RF literals/HybridSet
+    // expose StringRef bytes; the pruner must construct std::string from
+    // those bytes (not reinterpret_cast). Without that fix this case would
+    // read garbage / crash in BE under ASAN. We exercise both IN_FILTER (via
+    // small build side) and BLOOM/MIN_MAX paths by forcing IN_OR_BLOOM_FILTER.
+    // ============================================================
+    sql "drop table if exists rf_prune_list_str"
+    sql """
+        CREATE TABLE rf_prune_list_str (
+            id INT,
+            part_col VARCHAR(16) NOT NULL
+        )
+        PARTITION BY LIST(part_col) (
+            PARTITION pa VALUES IN ('a','b'),
+            PARTITION pc VALUES IN ('c','d'),
+            PARTITION pe VALUES IN ('e','f'),
+            PARTITION pg VALUES IN ('g','h')
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_list_str VALUES
+        (1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e'),(6,'f'),(7,'g'),(8,'h')"""
+
+    sql "drop table if exists rf_prune_dim_str"
+    sql """
+        CREATE TABLE rf_prune_dim_str (
+            dim_key VARCHAR(16) NOT NULL,
+            dim_val VARCHAR(32)
+        )
+        DISTRIBUTED BY HASH(dim_key) BUCKETS 1
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_dim_str VALUES ('c', 'x')"""
+
+    // RF {'c'} should keep only partition pc and prune the other three.
+    assertPruningProfile(
+        "* FROM rf_prune_list_str f JOIN rf_prune_dim_str d ON f.part_col = d.dim_key",
+        "IN_OR_BLOOM_FILTER", 4, 3)
+
+    // ============================================================
+    // Test 48: Grouped RF with multiple targets.
+    //
+    // The RF generated from d.dim_key -> f1.part_col is expanded through the
+    // inner join f1.part_col = f2.part_col, so Nereids creates two RF objects
+    // with the same source/type/builder and different scan targets. The legacy
+    // translator groups them into one RuntimeFilter with two targets. Both
+    // targets must retain partition-pruning monotonicity.
+    // ============================================================
+    sql "drop table if exists rf_prune_range_int_copy"
+    sql """
+        CREATE TABLE rf_prune_range_int_copy (
+            id INT NOT NULL,
+            part_col INT NOT NULL,
+            value VARCHAR(64)
+        )
+        PARTITION BY RANGE(part_col) (
+            PARTITION p1 VALUES [("0"), ("100")),
+            PARTITION p2 VALUES [("100"), ("200")),
+            PARTITION p3 VALUES [("200"), ("300")),
+            PARTITION p4 VALUES [("300"), ("400"))
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 4
+        PROPERTIES("replication_num" = "1")
+    """
+    sql """INSERT INTO rf_prune_range_int_copy VALUES
+        (1, 10, 'a'), (2, 20, 'b'), (3, 50, 'c'), (4, 80, 'd'), (5, 90, 'e'),
+        (6, 110, 'f'), (7, 120, 'g'), (8, 150, 'h'), (9, 180, 'i'), (10, 190, 'j'),
+        (11, 210, 'k'), (12, 220, 'l'), (13, 250, 'm'), (14, 280, 'n'), (15, 290, 'o'),
+        (16, 310, 'p'), (17, 320, 'q'), (18, 350, 'r'), (19, 380, 's'), (20, 390, 't')
+    """
+    assertPruningProfile(
+        "count(*) FROM rf_prune_range_int f1 "
+                + "JOIN rf_prune_range_int_copy f2 ON f1.part_col = f2.part_col "
+                + "JOIN rf_prune_dim_int d ON d.dim_key = f1.part_col",
+        "IN_OR_BLOOM_FILTER", 8, 6)
+
+    // ============================================================
+    // Test 49: Switch off enable_runtime_filter_partition_prune → no pruning
+    // ============================================================
+    sql "set enable_runtime_filter_partition_prune=false"
+    def token_off = UUID.randomUUID().toString()
+    sql """
+        SELECT /*+ SET_VAR(runtime_filter_type='IN_OR_BLOOM_FILTER') */
+            "${token_off}", f.id
+        FROM rf_prune_range_int f
+        JOIN rf_prune_dim_int d ON f.part_col = d.dim_key
+    """
+    def profile_off = getProfileByToken(token_off)
+    def total_off = extractCounterSum(profile_off, "TotalPartitionsForRFPruning")
+    def pruned_off = extractCounterSum(profile_off, "PartitionsPrunedByRuntimeFilter")
+    logger.info("switch_off: total=${total_off}, pruned=${pruned_off}")
+    assertTrue(total_off == 0,
+        "When switched off, no partitions should be considered (got total=${total_off})")
+    assertTrue(pruned_off == 0,
+        "When switched off, no partitions should be pruned (got pruned=${pruned_off})")
+    sql "set enable_runtime_filter_partition_prune=true"
+}