[fix](be) Tighten runtime filter partition pruning checks

### What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary: Runtime filter partition pruning accepted malformed NULL partition boundaries on non-nullable slots, carried an unreachable null-expression path, used FILTER_IN as an invalid comparison sentinel, and checked already-pruned scanners only after prepare/open work. This change fails fast on invalid NULL boundary metadata, asserts the expression invariant directly, uses std::optional for comparison opcode conversion, documents LIST partition monotonicity usage, and skips scanner prepare/open when the partition is already pruned.

### Release note

None

### Check List (For Author)

- Test: Unit Test
    - ./run-be-ut.sh --run --filter=RuntimeFilterPartitionPrunerTest.* -j 16
    - clang-format --dry-run --Werror be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp be/src/exec/scan/scanner_scheduler.cpp be/test/exec/runtime_filter/runtime_filter_partition_pruner_test.cpp
    - build-support/run-clang-tidy.sh --build-dir be/ut_build_ASAN (blocked by existing clang-tidy-16 parse errors from be/src/util/jni-util.h static_assert(false))
- Behavior changed: Yes. Invalid NULL partition boundaries on non-nullable slots fail fast, and already-pruned scanners skip prepare/open.
- Does this need documentation: No

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
diff --git a/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
index e65577e..a25059b 100644
--- a/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_partition_pruner.cpp
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/PlanNodes_types.h>
 
+#include <algorithm>
 #include <optional>
 #include <unordered_set>
 #include <utility>
@@ -72,88 +73,88 @@
 
         bool parsed_ok = false;
 
-#define BUILD_BOUNDARY_CVR(NAME)                                                               \
-    case TYPE_##NAME: {                                                                        \
-        using CppType = typename PrimitiveTypeTraits<TYPE_##NAME>::CppType;                    \
-        bool is_list = tb.__isset.list_values && !tb.list_values.empty();                      \
-        bool is_range = tb.__isset.range_start || tb.__isset.range_end;                        \
-        DORIS_CHECK(is_list || is_range);                                                      \
-        ColumnValueRange<TYPE_##NAME> cvr(slot->col_name(), is_nullable, precision, scale);    \
-        /* Returns nullopt if `node` is a NULL literal; the caller then sets contain_null  */  \
-        /* on the CVR instead of trying to extract a typed value (which would dereference  */  \
-        /* a null data pointer for the non-string branch).                                 */  \
-        auto parse_texpr_node = [&](const TExprNode& node) -> std::optional<CppType> {         \
-            if (node.node_type == TExprNodeType::NULL_LITERAL) {                               \
-                return std::nullopt;                                                           \
-            }                                                                                  \
-            /* `Field` value is copied into the CVR by `add_fixed_value` /             */      \
-            /* `add_range` (both take CppType by const-ref / by value), so the         */      \
-            /* temporary `Field`'s lifetime ending at this expression's full-statement */      \
-            /* boundary is safe -- including for `String` payloads.                    */      \
-            Field field = slot_type->get_field(node);                                          \
-            return std::make_optional<CppType>(field.get<TYPE_##NAME>());                      \
-        };                                                                                     \
-        if (is_list) {                                                                         \
-            auto empty_cvr = ColumnValueRange<TYPE_##NAME>::create_empty_column_value_range(   \
-                    is_nullable, precision, scale);                                            \
-            bool list_has_null = false;                                                        \
-            bool list_has_value = false;                                                       \
-            for (const auto& node : tb.list_values) {                                          \
-                auto parsed = parse_texpr_node(node);                                          \
-                if (!parsed) {                                                                 \
-                    list_has_null = true;                                                      \
-                    continue;                                                                  \
-                }                                                                              \
-                static_cast<void>(empty_cvr.add_fixed_value(*parsed));                         \
-                list_has_value = true;                                                         \
-            }                                                                                  \
-            if (list_has_value) {                                                              \
-                cvr.intersection(empty_cvr);                                                   \
-            }                                                                                  \
-            if (list_has_null && is_nullable) {                                                \
-                /* Track NULL membership on ParsedBoundary; calling          */                \
-                /* cvr.set_contain_null(true) here would invoke              */                \
-                /* set_empty_value_range() and discard the concrete fixed    */                \
-                /* values we just inserted, turning {NULL, v} into a         */                \
-                /* NULL-only boundary.                                       */                \
-                boundary.contains_null = true;                                                 \
-                if (!list_has_value) {                                                         \
-                    boundary.only_null = true;                                                 \
-                }                                                                              \
-            }                                                                                  \
-        } else {                                                                               \
-            bool range_has_null = false;                                                       \
-            if (is_nullable && !tb.__isset.range_start) {                                      \
-                range_has_null = true;                                                         \
-            }                                                                                  \
-            if (tb.__isset.range_start) {                                                      \
-                auto parsed = parse_texpr_node(tb.range_start);                                \
-                if (parsed) {                                                                  \
-                    static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, *parsed));         \
-                } else if (is_nullable) {                                                      \
-                    range_has_null = true;                                                     \
-                }                                                                              \
-            }                                                                                  \
-            if (tb.__isset.range_end) {                                                        \
-                auto parsed = parse_texpr_node(tb.range_end);                                  \
-                if (parsed) {                                                                  \
-                    /* Multi-column RANGE projection emits a CLOSED upper bound (see       */  \
-                    /* TPartitionBoundary.range_end_inclusive comment); single-column RANGE */ \
-                    /* keeps the natural OPEN upper bound matching Doris semantics.         */ \
-                    SQLFilterOp upper_op =                                                     \
-                            (tb.__isset.range_end_inclusive && tb.range_end_inclusive)         \
-                                    ? FILTER_LESS_OR_EQUAL                                     \
-                                    : FILTER_LESS;                                             \
-                    static_cast<void>(cvr.add_range(upper_op, *parsed));                       \
-                }                                                                              \
-            }                                                                                  \
-            if (range_has_null) {                                                              \
-                boundary.contains_null = true;                                                 \
-            }                                                                                  \
-        }                                                                                      \
-        boundary.boundary_cvr = std::move(cvr);                                                \
-        parsed_ok = true;                                                                      \
-        break;                                                                                 \
+#define BUILD_BOUNDARY_CVR(NAME)                                                                  \
+    case TYPE_##NAME: {                                                                           \
+        using CppType = typename PrimitiveTypeTraits<TYPE_##NAME>::CppType;                       \
+        bool is_list = tb.__isset.list_values && !tb.list_values.empty();                         \
+        bool is_range = tb.__isset.range_start || tb.__isset.range_end;                           \
+        DORIS_CHECK(is_list || is_range);                                                         \
+        ColumnValueRange<TYPE_##NAME> cvr(slot->col_name(), is_nullable, precision, scale);       \
+        /* Returns nullopt if `node` is a NULL literal; the caller then sets contain_null  */     \
+        /* on the CVR instead of trying to extract a typed value (which would dereference  */     \
+        /* a null data pointer for the non-string branch).                                 */     \
+        auto parse_texpr_node = [&](const TExprNode& node) -> std::optional<CppType> {            \
+            if (node.node_type == TExprNodeType::NULL_LITERAL) {                                  \
+                return std::nullopt;                                                              \
+            }                                                                                     \
+            /* `Field` value is copied into the CVR by `add_fixed_value` /             */         \
+            /* `add_range` (both take CppType by const-ref / by value), so the         */         \
+            /* temporary `Field`'s lifetime ending at this expression's full-statement */         \
+            /* boundary is safe -- including for `String` payloads.                    */         \
+            Field field = slot_type->get_field(node);                                             \
+            return std::make_optional<CppType>(field.get<TYPE_##NAME>());                         \
+        };                                                                                        \
+        if (is_list) {                                                                            \
+            auto empty_cvr = ColumnValueRange<TYPE_##NAME>::create_empty_column_value_range(      \
+                    is_nullable, precision, scale);                                               \
+            bool list_has_null = false;                                                           \
+            bool list_has_value = false;                                                          \
+            for (const auto& node : tb.list_values) {                                             \
+                auto parsed = parse_texpr_node(node);                                             \
+                if (!parsed) {                                                                    \
+                    list_has_null = true;                                                         \
+                    continue;                                                                     \
+                }                                                                                 \
+                static_cast<void>(empty_cvr.add_fixed_value(*parsed));                            \
+                list_has_value = true;                                                            \
+            }                                                                                     \
+            if (list_has_value) {                                                                 \
+                cvr.intersection(empty_cvr);                                                      \
+            }                                                                                     \
+            if (list_has_null) {                                                                  \
+                DORIS_CHECK(is_nullable);                                                         \
+                /* Track NULL membership on ParsedBoundary; calling          */                   \
+                /* cvr.set_contain_null(true) here would invoke              */                   \
+                /* set_empty_value_range() and discard the concrete fixed    */                   \
+                /* values we just inserted, turning {NULL, v} into a         */                   \
+                /* NULL-only boundary.                                       */                   \
+                boundary.contains_null = true;                                                    \
+                if (!list_has_value) {                                                            \
+                    boundary.only_null = true;                                                    \
+                }                                                                                 \
+            }                                                                                     \
+        } else {                                                                                  \
+            bool range_has_null = false;                                                          \
+            if (is_nullable && !tb.__isset.range_start) {                                         \
+                range_has_null = true;                                                            \
+            }                                                                                     \
+            if (tb.__isset.range_start) {                                                         \
+                auto parsed = parse_texpr_node(tb.range_start);                                   \
+                if (parsed) {                                                                     \
+                    static_cast<void>(cvr.add_range(FILTER_LARGER_OR_EQUAL, *parsed));            \
+                } else {                                                                          \
+                    DORIS_CHECK(is_nullable);                                                     \
+                    range_has_null = true;                                                        \
+                }                                                                                 \
+            }                                                                                     \
+            if (tb.__isset.range_end) {                                                           \
+                auto parsed = parse_texpr_node(tb.range_end);                                     \
+                DORIS_CHECK(parsed.has_value());                                                  \
+                /* Multi-column RANGE projection emits a CLOSED upper bound (see       */         \
+                /* TPartitionBoundary.range_end_inclusive comment); single-column RANGE */        \
+                /* keeps the natural OPEN upper bound matching Doris semantics.         */        \
+                SQLFilterOp upper_op = (tb.__isset.range_end_inclusive && tb.range_end_inclusive) \
+                                               ? FILTER_LESS_OR_EQUAL                             \
+                                               : FILTER_LESS;                                     \
+                static_cast<void>(cvr.add_range(upper_op, *parsed));                              \
+            }                                                                                     \
+            if (range_has_null) {                                                                 \
+                boundary.contains_null = true;                                                    \
+            }                                                                                     \
+        }                                                                                         \
+        boundary.boundary_cvr = std::move(cvr);                                                   \
+        parsed_ok = true;                                                                         \
+        break;                                                                                    \
     }
 
         switch (ptype) {
@@ -204,9 +205,7 @@
 // NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
 
 static bool find_unique_slot_ref_impl(const VExpr* expr, const VSlotRef** found) {
-    if (!expr) {
-        return true;
-    }
+    DORIS_CHECK(expr != nullptr);
     if (expr->is_slot_ref()) {
         const auto* slot = assert_cast<const VSlotRef*>(expr);
         if (*found == nullptr) {
@@ -219,12 +218,9 @@
         DORIS_CHECK((*found)->column_id() == slot->column_id());
         return true;
     }
-    for (const auto& child : expr->children()) {
-        if (!find_unique_slot_ref_impl(child.get(), found)) {
-            return false;
-        }
-    }
-    return true;
+    return std::ranges::all_of(expr->children(), [&](const auto& child) {
+        return find_unique_slot_ref_impl(child.get(), found);
+    });
 }
 
 static const VSlotRef* find_unique_slot_ref(const VExpr* expr) {
@@ -601,7 +597,7 @@
 }
 // NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
 
-static SQLFilterOp convert_opcode_to_filter_op(TExprOpcode::type op) {
+static std::optional<SQLFilterOp> convert_opcode_to_filter_op(TExprOpcode::type op) {
     switch (op) {
     case TExprOpcode::LE:
         return FILTER_LESS_OR_EQUAL;
@@ -612,7 +608,7 @@
     case TExprOpcode::GT:
         return FILTER_LARGER;
     default:
-        return FILTER_IN; // sentinel: caller should skip
+        return std::nullopt;
     }
 }
 
@@ -667,10 +663,6 @@
                                     const auto* str_val = reinterpret_cast<const StringRef*>(value);
                                     static_cast<void>(typed_rf_cvr.add_fixed_value(
                                             CppType(str_val->data, str_val->size)));
-                                } else if constexpr (std::is_same_v<CppType, StringRef>) {
-                                    const auto* str_val = reinterpret_cast<const StringRef*>(value);
-                                    static_cast<void>(typed_rf_cvr.add_fixed_value(
-                                            CppType(str_val->data, str_val->size)));
                                 } else {
                                     static_cast<void>(typed_rf_cvr.add_fixed_value(
                                             *reinterpret_cast<const CppType*>(value)));
@@ -693,21 +685,17 @@
                             // CppType is std::string, so construct one from
                             // the bytes rather than dereferencing as String.
                             val = CppType(data.data, data.size);
-                        } else if constexpr (std::is_same_v<CppType, StringRef>) {
-                            val = CppType(data.data, data.size);
                         } else {
                             val = *reinterpret_cast<const CppType*>(data.data);
                         }
 
-                        SQLFilterOp op = convert_opcode_to_filter_op(impl->op());
-                        if (op == FILTER_IN) {
-                            return; // unrecognized opcode, skip
-                        }
+                        auto op = convert_opcode_to_filter_op(impl->op());
+                        DORIS_CHECK(op.has_value());
 
                         CvrType typed_rf_cvr(boundary_cvr.column_name(),
                                              boundaries.front().is_nullable,
                                              boundary_cvr.precision(), boundary_cvr.scale());
-                        static_cast<void>(typed_rf_cvr.add_range(op, val));
+                        static_cast<void>(typed_rf_cvr.add_range(*op, val));
                         rf_cvr.emplace(std::move(typed_rf_cvr));
                     }
                 },
@@ -815,6 +803,8 @@
         VExprSPtr target_subtree = impl->children()[0];
 
         auto partition_mono_it = filter_id_to_partition_monotonicity.find(filter_id);
+        // For LIST partition targets, this metadata is also the per-partition eligibility map;
+        // BE projects every finite LIST value, so the direction itself is neutral there.
         bool has_partition_mono = partition_mono_it != filter_id_to_partition_monotonicity.end() &&
                                   !partition_mono_it->second.empty();
         if (!has_partition_mono) {
diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp
index 26cccd5..7e1deb2 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -127,6 +127,8 @@
     state->get_query_ctx()->set_low_memory_mode();
 }
 
+// NOLINTBEGIN(readability-function-cognitive-complexity,readability-function-size)
+// Existing scheduler loop owns scanner lifecycle and I/O accounting.
 void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                                      std::shared_ptr<ScanTask> scan_task) {
     auto task_lock = ctx->task_exec_ctx();
@@ -181,7 +183,9 @@
             // so better to also check low memory and clear free blocks here.
             if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
 
-            if (!scanner->has_prepared()) {
+            if (scanner->check_partition_pruned()) { eos = true; }
+
+            if (!eos && !scanner->has_prepared()) {
                 status = scanner->prepare();
                 if (!status.ok()) {
                     eos = true;
@@ -196,10 +200,12 @@
                 scanner->set_opened();
             }
 
-            Status rf_status = scanner->try_append_late_arrival_runtime_filter();
-            if (!rf_status.ok()) {
-                LOG(WARNING) << "Failed to append late arrival runtime filter: "
-                             << rf_status.to_string();
+            if (!eos) {
+                Status rf_status = scanner->try_append_late_arrival_runtime_filter();
+                if (!rf_status.ok()) {
+                    LOG(WARNING) << "Failed to append late arrival runtime filter: "
+                                 << rf_status.to_string();
+                }
             }
 
             // After processing late RFs, check if this scanner's partition was pruned.
@@ -304,6 +310,8 @@
 
     ctx->push_back_scan_task(scan_task);
 }
+// NOLINTEND(readability-function-cognitive-complexity,readability-function-size)
+
 int ScannerScheduler::default_local_scan_thread_num() {
     return config::doris_scanner_thread_pool_thread_num > 0
                    ? config::doris_scanner_thread_pool_thread_num
@@ -348,8 +356,7 @@
     size_t idx = 0;
     for (const auto& entry : *free_block) {
         // Virtual column must be materialized on the end of SegmentIterator's next batch method.
-        const ColumnNothing* column_nothing =
-                check_and_get_column<ColumnNothing>(entry.column.get());
+        const auto* column_nothing = check_and_get_column<ColumnNothing>(entry.column.get());
         if (column_nothing == nullptr) {
             idx++;
             continue;
diff --git a/be/test/exec/runtime_filter/runtime_filter_partition_pruner_test.cpp b/be/test/exec/runtime_filter/runtime_filter_partition_pruner_test.cpp
index 27a4c48..c419306 100644
--- a/be/test/exec/runtime_filter/runtime_filter_partition_pruner_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_partition_pruner_test.cpp
@@ -372,6 +372,32 @@
     EXPECT_DEATH({ static_cast<void>(parsed.parse({boundary}, slots)); }, "Check failed");
 }
 
+TEST_F(RuntimeFilterPartitionPrunerTest, NullBoundaryOnNonNullableSlotRejected) {
+    int32_t one = 1;
+    EXPECT_DEATH(
+            {
+                static_cast<void>(parse_boundaries(
+                        TYPE_INT, {list_boundary<TYPE_INT>(1, {null_node(TYPE_INT)})}, false));
+            },
+            "Check failed");
+
+    TPartitionBoundary null_start;
+    null_start.__set_partition_id(2);
+    null_start.__set_slot_id(SLOT_ID);
+    null_start.__set_range_start(null_node(TYPE_INT));
+    null_start.__set_range_end(literal_node<TYPE_INT>(one));
+    EXPECT_DEATH({ static_cast<void>(parse_boundaries(TYPE_INT, {null_start}, false)); },
+                 "Check failed");
+
+    TPartitionBoundary null_end;
+    null_end.__set_partition_id(3);
+    null_end.__set_slot_id(SLOT_ID);
+    null_end.__set_range_start(literal_node<TYPE_INT>(one));
+    null_end.__set_range_end(null_node(TYPE_INT));
+    EXPECT_DEATH({ static_cast<void>(parse_boundaries(TYPE_INT, {null_end}, false)); },
+                 "Check failed");
+}
+
 TEST_F(RuntimeFilterPartitionPrunerTest, InvalidPartitionMonotonicityRejected) {
     int32_t one = 1;
     std::vector<TPartitionBoundary> boundaries {