Fix conjunc (#64111)
diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp
index 5480dee..ed67c16 100644
--- a/be/src/exec/scan/file_scanner_v2.cpp
+++ b/be/src/exec/scan/file_scanner_v2.cpp
@@ -19,12 +19,14 @@
 
 #include <fmt/format.h>
 #include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/ExternalTableSchema_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 
 #include <algorithm>
 #include <charconv>
 #include <map>
 #include <memory>
+#include <optional>
 #include <string>
 #include <string_view>
 #include <utility>
@@ -115,115 +117,411 @@
     return &parent->children.back();
 }
 
-Status add_struct_access_path(reader::TableColumn* column, const DataTypeStruct& struct_type,
-                              const std::vector<std::string>& path, size_t path_idx);
+const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) {
+    if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) {
+        return nullptr;
+    }
+    return field_ptr.field_ptr.get();
+}
 
-Status add_access_path(reader::TableColumn* column, const DataTypePtr& type,
-                       const std::vector<std::string>& path, size_t path_idx) {
-    DORIS_CHECK(column != nullptr);
+DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type,
+                                           const std::string& field_name) {
+    for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
+        if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
+            return struct_type.get_element(field_idx);
+        }
+    }
+    return nullptr;
+}
+
+reader::TableColumn build_schema_column_from_external_field(const schema::external::TField& field,
+                                                            DataTypePtr type) {
+    reader::TableColumn column {
+            .id = field.__isset.id ? field.id : -1,
+            .name = field.__isset.name ? field.name : "",
+            .type = std::move(type),
+            .children = {},
+            .default_expr = nullptr,
+            .is_partition_key = false,
+    };
+    if (column.type == nullptr || !field.__isset.nestedField) {
+        return column;
+    }
+
+    const auto nested_type = remove_nullable(column.type);
+    switch (nested_type->get_primitive_type()) {
+    case TYPE_STRUCT: {
+        if (!field.nestedField.__isset.struct_field ||
+            !field.nestedField.struct_field.__isset.fields) {
+            return column;
+        }
+        const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type);
+        for (const auto& child_ptr : field.nestedField.struct_field.fields) {
+            const auto* child_field = get_field_ptr(child_ptr);
+            if (child_field == nullptr || !child_field->__isset.name) {
+                continue;
+            }
+            auto child_type = find_struct_child_type_by_name(struct_type, child_field->name);
+            if (child_type == nullptr) {
+                continue;
+            }
+            column.children.push_back(
+                    build_schema_column_from_external_field(*child_field, child_type));
+        }
+        break;
+    }
+    case TYPE_ARRAY: {
+        if (!field.nestedField.__isset.array_field ||
+            !field.nestedField.array_field.__isset.item_field) {
+            return column;
+        }
+        const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field);
+        if (item_field == nullptr) {
+            return column;
+        }
+        const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
+        column.children.push_back(
+                build_schema_column_from_external_field(*item_field, array_type.get_nested_type()));
+        break;
+    }
+    case TYPE_MAP: {
+        if (!field.nestedField.__isset.map_field ||
+            !field.nestedField.map_field.__isset.key_field ||
+            !field.nestedField.map_field.__isset.value_field) {
+            return column;
+        }
+        const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
+        const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field);
+        if (key_field != nullptr) {
+            column.children.push_back(
+                    build_schema_column_from_external_field(*key_field, map_type.get_key_type()));
+        }
+        const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field);
+        if (value_field != nullptr) {
+            column.children.push_back(build_schema_column_from_external_field(
+                    *value_field, map_type.get_value_type()));
+        }
+        break;
+    }
+    default:
+        break;
+    }
+    return column;
+}
+
+const reader::TableColumn* find_schema_child_by_path(const reader::TableColumn* schema_column,
+                                                     const std::string& child_path) {
+    if (schema_column == nullptr) {
+        return nullptr;
+    }
+    int32_t parsed_field_id = -1;
+    if (parse_non_negative_int(child_path, &parsed_field_id)) {
+        const auto child_it = std::ranges::find_if(
+                schema_column->children,
+                [&](const reader::TableColumn& child) { return child.id == parsed_field_id; });
+        return child_it == schema_column->children.end() ? nullptr : &*child_it;
+    }
+    const auto child_it = std::ranges::find_if(schema_column->children, [&](const auto& child) {
+        return to_lower(child.name) == to_lower(child_path);
+    });
+    return child_it == schema_column->children.end() ? nullptr : &*child_it;
+}
+
+const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params,
+                                                         const reader::TableColumn& column) {
+    if (params == nullptr || !params->__isset.history_schema_info ||
+        params->history_schema_info.empty()) {
+        return nullptr;
+    }
+    const auto* schema = &params->history_schema_info.front();
+    if (params->__isset.current_schema_id) {
+        for (const auto& candidate_schema : params->history_schema_info) {
+            if (candidate_schema.__isset.schema_id &&
+                candidate_schema.schema_id == params->current_schema_id) {
+                schema = &candidate_schema;
+                break;
+            }
+        }
+    }
+    if (!schema->__isset.root_field || !schema->root_field.__isset.fields) {
+        return nullptr;
+    }
+    for (const auto& field_ptr : schema->root_field.fields) {
+        const auto* field = get_field_ptr(field_ptr);
+        if (field == nullptr) {
+            continue;
+        }
+        if (field->__isset.id && field->id == column.id) {
+            return field;
+        }
+        if (field->__isset.name && to_lower(field->name) == to_lower(column.name)) {
+            return field;
+        }
+    }
+    return nullptr;
+}
+
+struct AccessPathNode {
+    bool project_all = false;
+    std::map<std::string, AccessPathNode> children;
+};
+
+void merge_access_path_node(AccessPathNode* dst, const AccessPathNode& src) {
+    DORIS_CHECK(dst != nullptr);
+    if (dst->project_all) {
+        return;
+    }
+    if (src.project_all) {
+        dst->project_all = true;
+        dst->children.clear();
+        return;
+    }
+    for (const auto& [path, child] : src.children) {
+        merge_access_path_node(&dst->children[path], child);
+    }
+}
+
+void insert_access_path(AccessPathNode* root, const std::vector<std::string>& path,
+                        size_t path_idx) {
+    DORIS_CHECK(root != nullptr);
+    if (root->project_all) {
+        return;
+    }
     if (path_idx >= path.size()) {
+        root->project_all = true;
+        root->children.clear();
+        return;
+    }
+    insert_access_path(&root->children[path[path_idx]], path, path_idx + 1);
+}
+
+Status build_nested_children_from_access_node(reader::TableColumn* column, const DataTypePtr& type,
+                                              const AccessPathNode& node, const std::string& path,
+                                              const reader::TableColumn* schema_column);
+
+Status build_struct_children_from_access_node(reader::TableColumn* column,
+                                              const DataTypeStruct& struct_type,
+                                              const AccessPathNode& node, const std::string& path,
+                                              const reader::TableColumn* schema_column) {
+    DORIS_CHECK(column != nullptr);
+    for (const auto& [child_path, child_node] : node.children) {
+        // Currently we do not support accessing struct children by position (e.g. "col.0") because it can be ambiguous and error-prone when the struct schema evolves. We only support accessing struct children by name (e.g. "col.child"). If needed, we can consider adding support for position-based access in the future with careful design and consideration.
+        if (child_path == "OFFSET" || child_path == "*" || child_path == "KEYS" ||
+            child_path == "VALUES") {
+            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+                                        path + "." + child_path, column->name);
+        }
+
+        // Try to find the child field in the schema column first. If not found, fallback to find the child field in the struct type by name (case-insensitive).
+        const auto* schema_child = find_schema_child_by_path(schema_column, child_path);
+        int32_t field_id = schema_child == nullptr ? -1 : schema_child->id;
+        std::string field_name = schema_child == nullptr ? child_path : schema_child->name;
+        DataTypePtr field_type = schema_child == nullptr ? nullptr : schema_child->type;
+        if (schema_child == nullptr) {
+            for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
+                if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
+                    field_id = cast_set<int32_t>(field_idx);
+                    field_name = struct_type.get_element_name(field_idx);
+                    field_type = struct_type.get_element(field_idx);
+                    break;
+                }
+            }
+        }
+
+        if (field_id < 0 || field_type == nullptr) {
+            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+                                        path + "." + child_path, column->name);
+        }
+        auto* child = find_or_add_child(column, field_id, field_name, field_type);
+        RETURN_IF_ERROR(build_nested_children_from_access_node(
+                child, child->type, child_node, path + "." + child_path, schema_child));
+    }
+    return Status::OK();
+}
+
+Status build_map_children_from_access_node(reader::TableColumn* column, const DataTypeMap& map_type,
+                                           const AccessPathNode& node, const std::string& path,
+                                           const reader::TableColumn* schema_column) {
+    DORIS_CHECK(column != nullptr);
+    AccessPathNode key_node;
+    AccessPathNode value_node;
+    bool need_key = false;
+    bool need_value = false;
+
+    for (const auto& [child_path, child_node] : node.children) {
+        if (child_path == "OFFSET") {
+            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+                                        path + "." + child_path, column->name);
+        }
+        if (child_path == "KEYS") {
+            need_key = true;
+            merge_access_path_node(&key_node, child_node);
+            continue;
+        }
+        if (child_path == "VALUES") {
+            need_key = true;
+            key_node.project_all = true;
+            key_node.children.clear();
+            need_value = true;
+            merge_access_path_node(&value_node, child_node);
+            continue;
+        }
+        if (child_path == "*") {
+            need_key = true;
+            key_node.project_all = true;
+            key_node.children.clear();
+            need_value = true;
+            merge_access_path_node(&value_node, child_node);
+            continue;
+        }
+        return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+                                    path + "." + child_path, column->name);
+    }
+    if (need_key && !need_value) {
+        // Keep value readable until the downstream map materialization path can construct a table
+        // Map column from keys only.
+        need_value = true;
+        value_node.project_all = true;
+        value_node.children.clear();
+    }
+
+    DataTypes entry_child_types;
+    Strings entry_child_names;
+    if (need_key) {
+        entry_child_types.push_back(map_type.get_key_type());
+        entry_child_names.push_back("key");
+    }
+    if (need_value) {
+        entry_child_types.push_back(map_type.get_value_type());
+        entry_child_names.push_back("value");
+    }
+    if (entry_child_types.empty()) {
         return Status::OK();
     }
-    if (path[path_idx] == "OFFSET") {
-        return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
-                                    access_path_to_string(path), column->name);
+
+    auto entry_type = std::make_shared<DataTypeStruct>(entry_child_types, entry_child_names);
+    auto* entry_child = find_or_add_child(column, 0, "entries", entry_type);
+    const auto* key_schema = schema_column != nullptr && !schema_column->children.empty()
+                                     ? &schema_column->children[0]
+                                     : nullptr;
+    const auto* value_schema = schema_column != nullptr && schema_column->children.size() > 1
+                                       ? &schema_column->children[1]
+                                       : nullptr;
+    if (need_key) {
+        auto* key_child = find_or_add_child(entry_child, 0, "key", map_type.get_key_type());
+        RETURN_IF_ERROR(build_nested_children_from_access_node(key_child, key_child->type, key_node,
+                                                               path + ".KEYS", key_schema));
+    }
+    if (need_value) {
+        auto* value_child = find_or_add_child(entry_child, 1, "value", map_type.get_value_type());
+        RETURN_IF_ERROR(build_nested_children_from_access_node(
+                value_child, value_child->type, value_node, path + ".VALUES", value_schema));
+    }
+    return Status::OK();
+}
+
+Status build_nested_children_from_access_node(reader::TableColumn* column, const DataTypePtr& type,
+                                              const AccessPathNode& node, const std::string& path,
+                                              const reader::TableColumn* schema_column) {
+    DORIS_CHECK(column != nullptr);
+    if (node.project_all || node.children.empty()) {
+        // If project_all is true or there is no specific child path, we need to project all children of the complex type.
+        return Status::OK();
     }
 
     const auto nested_type = remove_nullable(type);
     switch (nested_type->get_primitive_type()) {
     case TYPE_STRUCT:
-        return add_struct_access_path(column, assert_cast<const DataTypeStruct&>(*nested_type),
-                                      path, path_idx);
+        return build_struct_children_from_access_node(
+                column, assert_cast<const DataTypeStruct&>(*nested_type), node, path,
+                schema_column);
     case TYPE_ARRAY: {
+        if (node.children.size() != 1 || !node.children.contains("*")) {
+            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+                                        path, column->name);
+        }
         const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
         auto* child = find_or_add_child(column, 0, "element", array_type.get_nested_type());
-        return add_access_path(child, child->type, path, path_idx + 1);
+        const auto* element_schema = schema_column != nullptr && !schema_column->children.empty()
+                                             ? &schema_column->children[0]
+                                             : nullptr;
+        return build_nested_children_from_access_node(child, child->type, node.children.at("*"),
+                                                      path + ".*", element_schema);
     }
-    case TYPE_MAP: {
-        const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
-        if (path[path_idx] == "KEYS") {
-            return Status::NotSupported(
-                    "FileScannerV2 does not support key access path {} for slot {}",
-                    access_path_to_string(path), column->name);
-        }
-        if (path[path_idx] == "VALUES" || path[path_idx] == "*") {
-            auto entry_type = std::make_shared<DataTypeStruct>(
-                    DataTypes {map_type.get_value_type()}, Strings {"value"});
-            auto* entry_child = find_or_add_child(column, 0, "entries", entry_type);
-            auto* value_child =
-                    find_or_add_child(entry_child, 1, "value", map_type.get_value_type());
-            return add_access_path(value_child, value_child->type, path, path_idx + 1);
-        }
-        return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
-                                    access_path_to_string(path), column->name);
-    }
+    case TYPE_MAP:
+        return build_map_children_from_access_node(
+                column, assert_cast<const DataTypeMap&>(*nested_type), node, path, schema_column);
     default:
         return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
-                                    access_path_to_string(path), column->name);
+                                    path, column->name);
     }
 }
 
-Status add_struct_access_path(reader::TableColumn* column, const DataTypeStruct& struct_type,
-                              const std::vector<std::string>& path, size_t path_idx) {
-    DORIS_CHECK(column != nullptr);
-    DORIS_CHECK(path_idx < path.size());
-    int32_t field_id = -1;
-    std::string field_name = path[path_idx];
-    DataTypePtr field_type;
-    int32_t parsed_field_id = -1;
-    if (parse_non_negative_int(field_name, &parsed_field_id)) {
-        field_id = parsed_field_id;
-        if (parsed_field_id < static_cast<int32_t>(struct_type.get_elements().size())) {
-            field_name = struct_type.get_element_name(parsed_field_id);
-            field_type = struct_type.get_element(parsed_field_id);
-        }
-    } else if (const auto position = struct_type.try_get_position_by_name(field_name)) {
-        field_id = cast_set<int32_t>(*position);
-        field_type = struct_type.get_element(*position);
-    }
-
-    if (field_id < 0 || field_type == nullptr) {
-        return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
-                                    access_path_to_string(path), column->name);
-    }
-    auto* child = find_or_add_child(column, field_id, field_name, field_type);
-    return add_access_path(child, child->type, path, path_idx + 1);
-}
-
 Status build_nested_children_from_access_paths(reader::TableColumn* column,
-                                               const SlotDescriptor* slot_desc) {
+                                               const TColumnAccessPaths& access_paths,
+                                               const reader::TableColumn* schema_column) {
     DORIS_CHECK(column != nullptr);
-    DORIS_CHECK(slot_desc != nullptr);
-    if (!is_complex_type(slot_desc->type()->get_primitive_type())) {
-        return Status::OK();
-    }
-    if (slot_desc->all_access_paths().empty()) {
+    if (!is_complex_type(remove_nullable(column->type)->get_primitive_type())) {
+        DCHECK(access_paths.empty());
         return Status::OK();
     }
 
-    for (const auto& access_path : slot_desc->all_access_paths()) {
+    AccessPathNode root;
+    // Build tree for AccessPathNode.
+    // For example, for access paths ["a.b", "a.c", "d"], the tree will be:
+    // root
+    // ├── a
+    // │   ├── b
+    // │   └── c
+    // └── d
+    for (const auto& access_path : access_paths) {
+        // TODO: Support META access paths if needed. Currently FileScannerV2 only supports DATA access paths.
         if (access_path.type != TAccessPathType::DATA || !access_path.__isset.data_access_path) {
             return Status::NotSupported("FileScannerV2 only supports DATA access paths for slot {}",
                                         column->name);
         }
         const auto& path = access_path.data_access_path.path;
         if (path.empty()) {
-            return Status::NotSupported("FileScannerV2 found empty access path for slot {}",
-                                        column->name);
+            insert_access_path(&root, path, 0);
+            continue;
         }
         int32_t top_level_id = -1;
-        if (path.front() != column->name &&
+        if (to_lower(path.front()) != to_lower(column->name) &&
             (!parse_non_negative_int(path.front(), &top_level_id) || top_level_id != column->id)) {
             return Status::NotSupported("FileScannerV2 access path {} does not match slot {}",
                                         access_path_to_string(path), column->name);
         }
-        RETURN_IF_ERROR(add_access_path(column, column->type, path, 1));
+        insert_access_path(&root, path, 1);
     }
-    return Status::OK();
+    // Recursively build nested children for the column based on the AccessPathNode tree.
+    return build_nested_children_from_access_node(column, column->type, root, column->name,
+                                                  schema_column);
+}
+
+Status build_nested_children_from_access_paths(reader::TableColumn* column,
+                                               const SlotDescriptor* slot_desc,
+                                               const reader::TableColumn* schema_column) {
+    DORIS_CHECK(column != nullptr);
+    DORIS_CHECK(slot_desc != nullptr);
+    return build_nested_children_from_access_paths(column, slot_desc->all_access_paths(),
+                                                   schema_column);
 }
 
 } // namespace
 
+#ifdef BE_TEST
+Status FileScannerV2::TEST_build_nested_children_from_access_paths(
+        reader::TableColumn* column, const TColumnAccessPaths& access_paths) {
+    return build_nested_children_from_access_paths(column, access_paths, nullptr);
+}
+
+Status FileScannerV2::TEST_build_nested_children_from_access_paths(
+        reader::TableColumn* column, const TColumnAccessPaths& access_paths,
+        const reader::TableColumn* schema_column) {
+    return build_nested_children_from_access_paths(column, access_paths, schema_column);
+}
+#endif
+
 // TODO: Only support parquet format now
 bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
     return get_range_format_type(params, range) == TFileFormatType::FORMAT_PARQUET &&
@@ -344,7 +642,7 @@
             .io_ctx = _io_ctx,
             .runtime_state = _state,
             .scanner_profile = _local_state->scanner_profile(),
-            .allow_missing_columns = true, // TODO
+            .allow_missing_columns = false, // TODO
             .push_down_agg_type = _local_state->get_push_down_agg_type(),
             .profile = nullptr, // TODO
     }));
@@ -446,7 +744,14 @@
         }
         auto column = _build_table_column(it->second);
         RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
-        RETURN_IF_ERROR(build_nested_children_from_access_paths(&column, it->second));
+        std::optional<reader::TableColumn> schema_column;
+        if (const auto* schema_field = find_external_root_field(_params, column);
+            schema_field != nullptr) {
+            // If the column has a matching root field in the schema, use the schema field to build the column's nested children.
+            schema_column = build_schema_column_from_external_field(*schema_field, column.type);
+        }
+        RETURN_IF_ERROR(build_nested_children_from_access_paths(
+                &column, it->second, schema_column.has_value() ? &*schema_column : nullptr));
         if (is_partition_slot(slot_info)) {
             column.is_partition_key = true;
             _partition_slot_descs.emplace(column.name, it->second);
@@ -491,7 +796,11 @@
         if (it == _slot_id_to_desc.end()) {
             continue;
         }
-        (*predicates)[it->second->col_unique_id()] = slot_predicate_list;
+        (*predicates)[it->second->col_unique_id()] = {
+                reader::TableColumn {.id = it->second->col_unique_id(),
+                                     .name = it->second->col_name(),
+                                     .type = it->second->get_data_type_ptr()},
+                slot_predicate_list};
     }
     return Status::OK();
 }
diff --git a/be/src/exec/scan/file_scanner_v2.h b/be/src/exec/scan/file_scanner_v2.h
index a633cd7..bf8cbcf 100644
--- a/be/src/exec/scan/file_scanner_v2.h
+++ b/be/src/exec/scan/file_scanner_v2.h
@@ -32,6 +32,7 @@
 #include "exprs/vexpr_fwd.h"
 #include "format/reader/column_mapper.h"
 #include "format/reader/table_reader.h"
+#include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "io/io_common.h"
 #include "runtime/runtime_profile.h"
@@ -51,6 +52,13 @@
     static constexpr const char* NAME = "FileScannerV2";
 
     static bool is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range);
+#ifdef BE_TEST
+    static Status TEST_build_nested_children_from_access_paths(
+            reader::TableColumn* column, const std::vector<TColumnAccessPath>& access_paths);
+    static Status TEST_build_nested_children_from_access_paths(
+            reader::TableColumn* column, const std::vector<TColumnAccessPath>& access_paths,
+            const reader::TableColumn* schema_column);
+#endif
 
     FileScannerV2(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
                   std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
diff --git a/be/src/exprs/vexpr.h b/be/src/exprs/vexpr.h
index 79f3485..8b7246e 100644
--- a/be/src/exprs/vexpr.h
+++ b/be/src/exprs/vexpr.h
@@ -269,7 +269,7 @@
     static ColumnPtr filter_column_with_selector(const ColumnPtr& origin_column,
                                                  const Selector* selector, size_t count) {
         if (selector == nullptr) {
-            DCHECK_EQ(origin_column->size(), count);
+            DCHECK_EQ(origin_column->size(), count) << origin_column->get_name();
             return origin_column;
         }
         DCHECK_EQ(count, selector->size());
diff --git a/be/src/exprs/vslot_ref.cpp b/be/src/exprs/vslot_ref.cpp
index 87aad6b..2904c2a 100644
--- a/be/src/exprs/vslot_ref.cpp
+++ b/be/src/exprs/vslot_ref.cpp
@@ -89,6 +89,7 @@
 Status VSlotRef::execute_column_impl(VExprContext* context, const Block* block,
                                      const Selector* selector, size_t count,
                                      ColumnPtr& result_column) const {
+    DCHECK(_open_finished);
     if (_column_id >= 0 && _column_id >= block->columns()) {
         return Status::Error<ErrorCode::INTERNAL_ERROR>(
                 "input block not contain slot column {}, column_id={}, block={}", *_column_name,
diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 9ed8531..5f81b4e 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -58,6 +58,7 @@
     int column_id() const { return _column_id; }
 
     MOCK_FUNCTION int slot_id() const { return _slot_id; }
+    int column_uniq_id() const { return _column_uniq_id; }
 
     bool equals(const VExpr& other) override;
 
diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp
index 0e00b22..400437c 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -1262,18 +1262,25 @@
         if (mode == TableColumnMappingMode::BY_FIELD_ID && field.id == table_column.id) {
             return &field;
         }
-        if (field.name == table_column.name) {
+        if (to_lower(field.name) == to_lower(table_column.name)) {
             return &field;
         }
     }
     return nullptr;
 }
 
-static std::vector<int32_t> filter_slot_ids(const TableFilter& table_filter) {
-    if (!table_filter.slot_ids.empty()) {
-        return table_filter.slot_ids;
+static const SchemaField* find_file_child_for_complex_wrapper(const TableColumn& table_child,
+                                                              const SchemaField& file_field,
+                                                              TableColumnMappingMode mode) {
+    const auto primitive_type = remove_nullable(file_field.type)->get_primitive_type();
+    if (primitive_type == TYPE_ARRAY || primitive_type == TYPE_MAP) {
+        if (file_field.children.empty()) {
+            return nullptr;
+        }
+        DORIS_CHECK(file_field.children.size() == 1);
+        return &file_field.children[0];
     }
-    return {};
+    return find_file_child_by_table_column(table_child, file_field.children, mode);
 }
 
 Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& projected_columns,
@@ -1381,16 +1388,18 @@
     file_request->column_predicate_filters.clear();
     // 1. Build referenced non-predicate columns
     for (const auto& table_column : projected_columns) {
-        auto* mapping = _find_mapping(table_column.id);
+        auto* mapping = _find_mapping(table_column);
         if (mapping != nullptr && mapping->field_id.has_value()) {
             // A file column can be read lazily as a non-predicate column only when it is not used
             // by row-level expression filters. Single-column ColumnPredicate filters are pruning
             // hints only and must not force row-level predicate materialization.
             bool used_by_filter = false;
             for (const auto& table_filter : table_filters) {
-                const auto slot_ids = filter_slot_ids(table_filter);
-                if (std::find(slot_ids.begin(), slot_ids.end(), table_column.id) !=
-                    slot_ids.end()) {
+                const auto columns = table_filter.column_unique_ids;
+                if (std::find_if(columns.begin(), columns.end(),
+                                 [&](const TableColumn& col) -> bool {
+                                     return table_column.id == col.id;
+                                 }) != columns.end()) {
                     used_by_filter = true;
                     break;
                 }
@@ -1409,12 +1418,44 @@
             continue;
         }
         auto position_it = file_request->column_positions.find(*mapping.field_id);
-        DORIS_CHECK(position_it != file_request->column_positions.end());
+        DORIS_CHECK(position_it != file_request->column_positions.end())
+                << file_request->column_positions.size() << " " << *mapping.field_id << " "
+                << mapping.file_column_name;
         rebuild_projection(&mapping, position_it->second);
     }
     return Status::OK();
 }
 
+ColumnMapping* TableColumnMapper::_find_mapping(const TableColumn& table_column) {
+    for (auto& mapping : _mappings) {
+        if ((_options.mode == TableColumnMappingMode::BY_FIELD_ID ||
+             _options.mode == TableColumnMappingMode::BY_INDEX) &&
+            mapping.table_column_id == table_column.id) {
+            return &mapping;
+        }
+        if (_options.mode == TableColumnMappingMode::BY_NAME &&
+            to_lower(mapping.file_column_name) == to_lower(table_column.name)) {
+            return &mapping;
+        }
+    }
+    return nullptr;
+}
+
+const ColumnMapping* TableColumnMapper::_find_mapping(const TableColumn& table_column) const {
+    for (const auto& mapping : _mappings) {
+        if ((_options.mode == TableColumnMappingMode::BY_FIELD_ID ||
+             _options.mode == TableColumnMappingMode::BY_INDEX) &&
+            mapping.table_column_id == table_column.id) {
+            return &mapping;
+        }
+        if (_options.mode == TableColumnMappingMode::BY_NAME &&
+            to_lower(mapping.file_column_name) == to_lower(table_column.name)) {
+            return &mapping;
+        }
+    }
+    return nullptr;
+}
+
 Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table_filters,
                                            const TableColumnPredicates& table_column_predicates,
                                            FileScanRequest* file_request) {
@@ -1423,7 +1464,7 @@
     FilterProjectionMap filter_projections;
     RETURN_IF_ERROR(build_filter_projection_map(table_filters, &_mappings, &filter_projections));
     for (const auto& table_filter : table_filters) {
-        for (const auto table_column_id : filter_slot_ids(table_filter)) {
+        for (const auto& table_column_id : table_filter.column_unique_ids) {
             auto* mapping = _find_mapping(table_column_id);
             if (mapping == nullptr || !mapping->field_id.has_value()) {
                 continue;
@@ -1445,13 +1486,13 @@
         }
     }
     for (const auto& [table_column_id, predicates] : table_column_predicates) {
-        const auto* mapping = _find_mapping(table_column_id);
-        if (mapping == nullptr || !mapping->field_id.has_value() || predicates.empty()) {
+        const auto* mapping = _find_mapping(predicates.first);
+        if (mapping == nullptr || !mapping->field_id.has_value() || predicates.second.empty()) {
             continue;
         }
         FileColumnPredicateFilter column_predicate_filter;
         column_predicate_filter.file_column_id = *mapping->field_id;
-        column_predicate_filter.predicates = predicates;
+        column_predicate_filter.predicates = predicates.second;
         file_request->column_predicate_filters.push_back(std::move(column_predicate_filter));
     }
     for (const auto& table_filter : table_filters) {
@@ -1477,7 +1518,7 @@
                 return &field;
             }
         }
-        if (field.name == table_column.name) {
+        if (to_lower(field.name) == to_lower(table_column.name)) {
             return &field;
         }
     }
@@ -1501,8 +1542,8 @@
 
     if (!table_column.children.empty()) {
         for (const auto& table_child : table_column.children) {
-            const auto* file_child = find_file_child_by_table_column(
-                    table_child, file_field.children, _options.mode);
+            const auto* file_child =
+                    find_file_child_for_complex_wrapper(table_child, file_field, _options.mode);
             if (file_child == nullptr) {
                 if (!_options.allow_missing_columns) {
                     return Status::InvalidArgument(
diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h
index 78ca20d..abf216f 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -41,7 +41,8 @@
 struct TableColumn;
 struct TableFilter;
 
-using TableColumnPredicates = std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
+using TableColumnPredicates =
+        std::map<int32_t, std::pair<TableColumn, std::vector<std::shared_ptr<ColumnPredicate>>>>;
 
 enum class TableColumnMappingMode {
     BY_FIELD_ID,
@@ -138,23 +139,8 @@
                                     const std::vector<SchemaField>& file_schema,
                                     ColumnMapping* mapping) const;
 
-    ColumnMapping* _find_mapping(int32_t table_column_id) {
-        for (auto& mapping : _mappings) {
-            if (mapping.table_column_id == table_column_id) {
-                return &mapping;
-            }
-        }
-        return nullptr;
-    }
-
-    const ColumnMapping* _find_mapping(int32_t table_column_id) const {
-        for (const auto& mapping : _mappings) {
-            if (mapping.table_column_id == table_column_id) {
-                return &mapping;
-            }
-        }
-        return nullptr;
-    }
+    ColumnMapping* _find_mapping(const TableColumn& table_column);
+    const ColumnMapping* _find_mapping(const TableColumn& table_column) const;
 
     bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& file_type) const {
         DORIS_CHECK(table_type != nullptr);
diff --git a/be/src/format/reader/table/hive_reader.cpp b/be/src/format/reader/table/hive_reader.cpp
index 5b19779..ec3239d 100644
--- a/be/src/format/reader/table/hive_reader.cpp
+++ b/be/src/format/reader/table/hive_reader.cpp
@@ -56,11 +56,9 @@
         break;
     }
 
-    reader::TableColumnMapperOptions mapper_options;
-    mapper_options.mode = use_column_names ? reader::TableColumnMappingMode::BY_NAME
+    _mapper_options.mode = use_column_names ? reader::TableColumnMappingMode::BY_NAME
                                            : reader::TableColumnMappingMode::BY_INDEX;
-    mapper_options.allow_missing_columns = allow_missing_columns;
-    _data_reader.column_mapper = reader::TableColumnMapper(mapper_options);
+    _mapper_options.allow_missing_columns = allow_missing_columns;
     return Status::OK();
 }
 
diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp
index 2aebcad..3798a7e 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -39,16 +39,20 @@
 namespace doris::reader {
 namespace {
 
-void collect_table_slot_ids(const VExprSPtr& expr, std::set<int>* slot_ids) {
+void collect_table_column_unique_ids(const VExprSPtr& expr,
+                                     std::map<ColumnId, TableColumn>* column_unique_ids) {
     if (expr == nullptr) {
         return;
     }
     if (expr->is_slot_ref()) {
         const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
-        slot_ids->insert(slot_ref->slot_id());
+        column_unique_ids->insert(
+                {slot_ref->column_uniq_id(), TableColumn {.id = slot_ref->column_uniq_id(),
+                                                          .name = slot_ref->column_name(),
+                                                          .type = slot_ref->data_type()}});
     }
     for (const auto& child : expr->children()) {
-        collect_table_slot_ids(child, slot_ids);
+        collect_table_column_unique_ids(child, column_unique_ids);
     }
 }
 
@@ -57,13 +61,15 @@
     if (conjunct == nullptr) {
         return Status::OK();
     }
-    std::set<int> slot_ids;
-    collect_table_slot_ids(conjunct->root(), &slot_ids);
-    if (!slot_ids.empty()) {
+    std::map<ColumnId, TableColumn> columns;
+    collect_table_column_unique_ids(conjunct->root(), &columns);
+    if (!columns.empty()) {
         TableFilter table_filter;
         table_filter.conjunct = nullptr;
         RETURN_IF_ERROR(conjunct->clone(state, table_filter.conjunct));
-        table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end());
+        for (const auto& [column_id, column] : columns) {
+            table_filter.column_unique_ids.push_back(column);
+        }
         table_filters->push_back(std::move(table_filter));
     }
     return Status::OK();
@@ -152,10 +158,8 @@
     _projected_columns = std::move(options.projected_columns);
     _system_properties = create_system_properties(_scan_params);
     _profile = std::move(options.profile);
-    TableColumnMapperOptions mapper_options;
-    mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
-    mapper_options.allow_missing_columns = options.allow_missing_columns;
-    _data_reader.column_mapper = TableColumnMapper(mapper_options);
+    _mapper_options.mode = TableColumnMappingMode::BY_NAME;
+    _mapper_options.allow_missing_columns = options.allow_missing_columns;
     _conjuncts = std::move(options.conjuncts);
     _table_column_predicates = std::move(options.column_predicates);
     return Status::OK();
@@ -224,6 +228,7 @@
 }
 
 Status TableReader::prepare_split(const SplitReadOptions& options) {
+    _data_reader.column_mapper = TableColumnMapper(_mapper_options);
     _partition_values = std::move(options.partition_values);
     _current_task = std::make_unique<ScanTask>();
     _current_task->data_file = create_file_description(options.current_range);
diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h
index 9eb3215..6df2b8d 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -67,23 +67,19 @@
 // table/global schema 中的列视图。
 // Iceberg 场景下,id 默认对应 Iceberg field id。该结构不描述文件中的物理列。
 struct TableColumn {
-    ColumnId id = -1;
+    ColumnId id = -1; // column_unique_id
     std::string name;
     DataTypePtr type;
-    std::vector<TableColumn> children;
-    VExprContextSPtr default_expr;
+    std::vector<TableColumn> children {};
+    VExprContextSPtr default_expr = nullptr;
     bool is_partition_key = false;
 };
 
 // Row-level predicates on table/global schema. They are rewritten to file-local expressions when
 // possible, and remain the source of row-level filtering after localization.
 struct TableFilter {
-    // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
     VExprContextSPtr conjunct;
-
-    // Table slot ids referenced by conjunct. A single expression filter may depend on multiple
-    // columns, while ColumnPredicate pruning still belongs to one concrete column.
-    std::vector<int32_t> slot_ids;
+    std::vector<TableColumn> column_unique_ids;
 };
 
 enum class TableFilterConversion {
@@ -809,6 +805,7 @@
     FileFormat _format;
     TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
     bool _aggregate_pushdown_tried = false;
+    TableColumnMapperOptions _mapper_options;
 
 private:
     static const SchemaField* _find_schema_field(const std::vector<SchemaField>& schema,
diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h
index 32b4496..48aa652 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -45,6 +45,11 @@
 class IcebergTableReader : public reader::TableReader {
 public:
     ~IcebergTableReader() override = default;
+    Status init(reader::TableReadOptions&& options) override {
+        RETURN_IF_ERROR(reader::TableReader::init(std::move(options)));
+        _mapper_options.mode = reader::TableColumnMappingMode::BY_FIELD_ID;
+        return Status::OK();
+    }
 
     Status prepare_split(const reader::SplitReadOptions& options) override;
 
diff --git a/be/test/exec/scan/vfile_scanner_exception_test.cpp b/be/test/exec/scan/vfile_scanner_exception_test.cpp
index 912e3e1..4915411 100644
--- a/be/test/exec/scan/vfile_scanner_exception_test.cpp
+++ b/be/test/exec/scan/vfile_scanner_exception_test.cpp
@@ -18,10 +18,17 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <gtest/gtest.h>
 
+#include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "common/object_pool.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
 #include "cpp/sync_point.h"
 #include "exec/operator/file_scan_operator.h"
 #include "exec/scan/file_scanner.h"
@@ -36,6 +43,18 @@
 #include "runtime/user_function_cache.h"
 
 namespace doris {
+namespace {
+
+TColumnAccessPath data_access_path(std::vector<std::string> path) {
+    TColumnAccessPath access_path;
+    access_path.__set_type(TAccessPathType::DATA);
+    TDataAccessPath data_path;
+    data_path.__set_path(std::move(path));
+    access_path.__set_data_access_path(std::move(data_path));
+    return access_path;
+}
+
+} // namespace
 
 class TestSplitSourceConnectorStub : public SplitSourceConnector {
 private:
@@ -378,4 +397,107 @@
     EXPECT_TRUE(supported_split_source.all_scan_ranges_match(params, FileScannerV2::is_supported));
 }
 
+TEST(FileScannerV2Test, BuildNestedChildrenFromAccessPaths) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const auto key_type = std::make_shared<DataTypeString>();
+    const auto value_type =
+            std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"b", "c"});
+    reader::TableColumn column {
+            .id = 100, .name = "m", .type = std::make_shared<DataTypeMap>(key_type, value_type)};
+
+    std::vector<TColumnAccessPath> access_paths;
+    access_paths.push_back(data_access_path({"m", "KEYS"}));
+    access_paths.push_back(data_access_path({"m", "VALUES", "b"}));
+    access_paths.push_back(data_access_path({"m", "*", "c"}));
+    auto status =
+            FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths);
+    ASSERT_TRUE(status.ok()) << status;
+
+    ASSERT_EQ(column.children.size(), 1);
+    const auto& entries = column.children[0];
+    EXPECT_EQ(entries.id, 0);
+    EXPECT_EQ(entries.name, "entries");
+    ASSERT_EQ(entries.children.size(), 2);
+    EXPECT_EQ(entries.children[0].name, "key");
+    EXPECT_TRUE(entries.children[0].children.empty());
+    EXPECT_EQ(entries.children[1].name, "value");
+    ASSERT_EQ(entries.children[1].children.size(), 2);
+    EXPECT_EQ(entries.children[1].children[0].name, "b");
+    EXPECT_EQ(entries.children[1].children[1].name, "c");
+}
+
+TEST(FileScannerV2Test, BuildArrayStructChildrenFromAccessPaths) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const auto element_type =
+            std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"a", "b"});
+    reader::TableColumn column {
+            .id = 100,
+            .name = "arr",
+            .type = std::make_shared<DataTypeArray>(element_type),
+    };
+
+    std::vector<TColumnAccessPath> access_paths;
+    access_paths.push_back(data_access_path({"arr", "*", "a"}));
+    auto status =
+            FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths);
+    ASSERT_TRUE(status.ok()) << status;
+
+    ASSERT_EQ(column.children.size(), 1);
+    const auto& element = column.children[0];
+    EXPECT_EQ(element.id, 0);
+    EXPECT_EQ(element.name, "element");
+    ASSERT_EQ(element.children.size(), 1);
+    EXPECT_EQ(element.children[0].id, 0);
+    EXPECT_EQ(element.children[0].name, "a");
+}
+
+TEST(FileScannerV2Test, BuildStructChildrenFromFieldIdAccessPaths) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const auto struct_type =
+            std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"a", "b"});
+    reader::TableColumn column {
+            .id = 100,
+            .name = "s",
+            .type = struct_type,
+    };
+    reader::TableColumn schema_column {
+            .id = 100,
+            .name = "s",
+            .type = struct_type,
+            .children =
+                    {
+                            {.id = 101, .name = "a", .type = int_type},
+                            {.id = 205, .name = "b", .type = int_type},
+                    },
+    };
+
+    std::vector<TColumnAccessPath> access_paths;
+    access_paths.push_back(data_access_path({"100", "205"}));
+    auto status = FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths,
+                                                                              &schema_column);
+    ASSERT_TRUE(status.ok()) << status;
+
+    ASSERT_EQ(column.children.size(), 1);
+    EXPECT_EQ(column.children[0].id, 205);
+    EXPECT_EQ(column.children[0].name, "b");
+}
+
+TEST(FileScannerV2Test, BuildNestedChildrenKeepsTopLevelProjectionWhole) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    reader::TableColumn column {
+            .id = 100,
+            .name = "s",
+            .type = std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type},
+                                                     Strings {"a", "b"}),
+    };
+
+    std::vector<TColumnAccessPath> access_paths;
+    access_paths.push_back(data_access_path({"s"}));
+    access_paths.push_back(data_access_path({"s", "a"}));
+    auto status =
+            FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_TRUE(column.children.empty());
+}
+
 } // namespace doris
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp
index 260e656..15e57e8 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -705,7 +705,7 @@
     filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field<TYPE_INT>(5)));
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -770,7 +770,7 @@
     filter_expr->add_child(TableLiteral::create_shared(id_type, Field::create_field<TYPE_INT>(5)));
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -832,7 +832,7 @@
             a_type, {Field::create_field<TYPE_INT>(5), Field::create_field<TYPE_INT>(7)});
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -887,7 +887,7 @@
             TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, "a"));
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -956,7 +956,7 @@
                               {Field::create_field<TYPE_INT>(5), Field::create_field<TYPE_INT>(7)});
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -1012,7 +1012,7 @@
     filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field<TYPE_INT>(5)));
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -1073,7 +1073,7 @@
     filter_expr->add_child(right);
     reader::TableFilter table_filter {
             .conjunct = VExprContext::create_shared(filter_expr),
-            .slot_ids = {100},
+            .column_unique_ids = {table_column},
     };
 
     reader::TableColumnMapperOptions options;
@@ -1200,8 +1200,10 @@
     ASSERT_TRUE(mapper.create_mapping({table_id, table_value}, {}, {id_field, value_field}).ok());
 
     reader::TableColumnPredicates column_predicates;
-    column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
-            0, "id", id_field.type, Field::create_field<TYPE_INT>(2), false));
+    column_predicates[0] = {
+            table_id,
+            {create_comparison_predicate<PredicateType::GT>(
+                    0, "id", id_field.type, Field::create_field<TYPE_INT>(2), false)}};
 
     auto request = std::make_unique<reader::FileScanRequest>();
     ASSERT_TRUE(mapper.create_scan_request({}, column_predicates, {table_id, table_value},
diff --git a/be/test/format/reader/expr/cast_test.cpp b/be/test/format/reader/expr/cast_test.cpp
index 24491d4..f2c814a 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -340,7 +340,7 @@
     predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -400,7 +400,7 @@
             TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(15)));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -459,7 +459,7 @@
     predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -519,7 +519,7 @@
             TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(22)));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -563,7 +563,7 @@
                                                      Field::create_field<TYPE_STRING>("bad")));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -600,7 +600,7 @@
             TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(22)));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::SchemaField int_file_field;
     int_file_field.id = 0;
@@ -665,7 +665,7 @@
                                                      Field::create_field<TYPE_STRING>("bad")));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -698,7 +698,7 @@
             TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(15)));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::SchemaField int_file_field;
     int_file_field.id = 0;
@@ -761,7 +761,7 @@
             TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(15)));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
@@ -801,7 +801,7 @@
     predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest first_request;
     ASSERT_TRUE(
@@ -832,7 +832,7 @@
     predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::SchemaField int_file_field;
     int_file_field.id = 0;
@@ -905,7 +905,7 @@
     predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
     reader::TableFilter table_filter;
     table_filter.conjunct = VExprContext::create_shared(predicate);
-    table_filter.slot_ids = {7};
+    table_filter.column_unique_ids = {table_column};
 
     reader::FileScanRequest first_request;
     ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &first_request).ok());
diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp
index 3685568..b8b83a6 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -77,7 +77,7 @@
 public:
     TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value)
             : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
-        add_child(TableSlotRef::create_shared(slot_id, column_id, -1,
+        add_child(TableSlotRef::create_shared(slot_id, column_id, slot_id,
                                               std::make_shared<DataTypeInt32>(), "id"));
         set_node_type(TExprNodeType::BINARY_PRED);
         _opcode = TExprOpcode::GT;
@@ -163,9 +163,9 @@
     TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int right_slot_id,
                                  int right_column_id, int32_t value)
             : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
-        add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+        add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, left_slot_id,
                                               std::make_shared<DataTypeInt32>(), "id"));
-        add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, -1,
+        add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, right_slot_id,
                                               std::make_shared<DataTypeInt32>(), "score"));
         set_node_type(TExprNodeType::BINARY_PRED);
         _opcode = TExprOpcode::GT;
@@ -203,9 +203,9 @@
     TableInt32SumLessThanExpr(int left_slot_id, int left_column_id, int right_slot_id,
                               int right_column_id, int32_t value)
             : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
-        add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+        add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, left_slot_id,
                                               std::make_shared<DataTypeInt32>(), "id"));
-        add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, -1,
+        add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, right_slot_id,
                                               std::make_shared<DataTypeInt32>(), "score"));
         set_node_type(TExprNodeType::BINARY_PRED);
         _opcode = TExprOpcode::LT;
@@ -389,33 +389,36 @@
 
 void write_list_struct_parquet_file(const std::string& file_path) {
     auto struct_type = arrow::struct_(
-            {arrow::field("a", arrow::int32(), false), arrow::field("b", arrow::utf8(), false)});
+            {arrow::field("a", arrow::int32(), false), arrow::field("b", arrow::int32(), false)});
     std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;
     auto a_array_builder = std::make_unique<arrow::Int32Builder>();
     field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder)));
-    auto b_array_builder = std::make_unique<arrow::StringBuilder>();
+    auto b_array_builder = std::make_unique<arrow::Int32Builder>();
     field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder)));
     auto struct_builder = std::make_shared<arrow::StructBuilder>(
             struct_type, arrow::default_memory_pool(), std::move(field_builders));
-    auto list_type = arrow::list(arrow::field("element", struct_type, false));
+    auto list_type = arrow::list(arrow::field("element", struct_type, true));
     arrow::ListBuilder builder(arrow::default_memory_pool(), struct_builder, list_type);
     auto* a_builder = assert_cast<arrow::Int32Builder*>(struct_builder->field_builder(0));
-    auto* b_builder = assert_cast<arrow::StringBuilder*>(struct_builder->field_builder(1));
+    auto* b_builder = assert_cast<arrow::Int32Builder*>(struct_builder->field_builder(1));
 
     EXPECT_TRUE(builder.Append().ok());
     EXPECT_TRUE(struct_builder->Append().ok());
     EXPECT_TRUE(a_builder->Append(10).ok());
-    EXPECT_TRUE(b_builder->Append("la").ok());
+    EXPECT_TRUE(b_builder->Append(11).ok());
     EXPECT_TRUE(struct_builder->Append().ok());
     EXPECT_TRUE(a_builder->Append(20).ok());
-    EXPECT_TRUE(b_builder->Append("lb").ok());
+    EXPECT_TRUE(b_builder->Append(21).ok());
 
     EXPECT_TRUE(builder.Append().ok());
     EXPECT_TRUE(struct_builder->Append().ok());
     EXPECT_TRUE(a_builder->Append(30).ok());
-    EXPECT_TRUE(b_builder->Append("lc").ok());
+    EXPECT_TRUE(b_builder->Append(31).ok());
 
-    EXPECT_TRUE(builder.AppendEmptyValue().ok());
+    EXPECT_TRUE(builder.Append().ok());
+    EXPECT_TRUE(struct_builder->Append().ok());
+    EXPECT_TRUE(a_builder->Append(40).ok());
+    EXPECT_TRUE(b_builder->Append(41).ok());
 
     auto schema = arrow::schema({
             arrow::field("xs", list_type, false),
@@ -725,6 +728,22 @@
     return field;
 }
 
+void add_column_predicate(TableColumnPredicates* column_predicates, const TableColumn& column,
+                          std::shared_ptr<ColumnPredicate> predicate) {
+    auto& entry = (*column_predicates)[column.id];
+    entry.first = column;
+    entry.second.push_back(std::move(predicate));
+}
+
+VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) {
+    auto ctx = VExprContext::create_shared(expr);
+    auto status = ctx->prepare(state, RowDescriptor());
+    EXPECT_TRUE(status.ok()) << status;
+    status = ctx->open(state);
+    EXPECT_TRUE(status.ok()) << status;
+    return ctx;
+}
+
 TEST(TableReaderTest, ReopenSplitAfterClose) {
     const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_test";
     std::filesystem::remove_all(test_dir);
@@ -748,7 +767,8 @@
     ASSERT_TRUE(reader.init({
                                     .projected_columns = projected_columns,
                                     .column_predicates = {},
-                                    .conjuncts = {VExprContext::create_shared(
+                                    .conjuncts = {prepared_conjunct(
+                                            &state,
                                             std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0))},
                                     .format = FileFormat::PARQUET,
                                     .scan_params = nullptr,
@@ -984,13 +1004,10 @@
     const auto file_path = (test_dir / "split.parquet").string();
     write_list_struct_parquet_file(file_path);
 
-    const auto string_type = std::make_shared<DataTypeString>();
-    auto b_child = make_table_column(1, "b", string_type);
-    auto element_type = std::make_shared<DataTypeStruct>(DataTypes {string_type}, Strings {"b"});
-    auto element_child = make_table_column(0, "element", element_type);
-    element_child.children = {b_child};
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    auto element_type =
+            std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"a", "b"});
     auto list_column = make_table_column(100, "xs", std::make_shared<DataTypeArray>(element_type));
-    list_column.children = {element_child};
     const std::vector<TableColumn> projected_columns = {list_column};
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
@@ -1013,19 +1030,31 @@
 
     Block block = build_table_block(projected_columns);
     bool eos = false;
-    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    auto status = reader.get_block(&block, &eos);
+    ASSERT_TRUE(status.ok()) << status;
     ASSERT_FALSE(eos);
     ASSERT_EQ(block.rows(), 3);
     const auto& array_result = assert_cast<const ColumnArray&>(*block.get_by_position(0).column);
     EXPECT_EQ(array_result.get_offsets()[0], 2);
     EXPECT_EQ(array_result.get_offsets()[1], 3);
-    EXPECT_EQ(array_result.get_offsets()[2], 3);
-    const auto& element_struct = assert_cast<const ColumnStruct&>(array_result.get_data());
-    ASSERT_EQ(element_struct.get_columns().size(), 1);
-    const auto& b_values = assert_cast<const ColumnString&>(element_struct.get_column(0));
-    EXPECT_EQ(b_values.get_data_at(0).to_string(), "la");
-    EXPECT_EQ(b_values.get_data_at(1).to_string(), "lb");
-    EXPECT_EQ(b_values.get_data_at(2).to_string(), "lc");
+    EXPECT_EQ(array_result.get_offsets()[2], 4);
+    const auto& nullable_elements = assert_cast<const ColumnNullable&>(array_result.get_data());
+    for (const auto is_null : nullable_elements.get_null_map_data()) {
+        EXPECT_EQ(is_null, 0);
+    }
+    const auto& element_struct =
+            assert_cast<const ColumnStruct&>(nullable_elements.get_nested_column());
+    ASSERT_EQ(element_struct.get_columns().size(), 2);
+    const auto& a_values = assert_cast<const ColumnInt32&>(element_struct.get_column(0));
+    EXPECT_EQ(a_values.get_element(0), 10);
+    EXPECT_EQ(a_values.get_element(1), 20);
+    EXPECT_EQ(a_values.get_element(2), 30);
+    EXPECT_EQ(a_values.get_element(3), 40);
+    const auto& b_values = assert_cast<const ColumnInt32&>(element_struct.get_column(1));
+    EXPECT_EQ(b_values.get_element(0), 11);
+    EXPECT_EQ(b_values.get_element(1), 21);
+    EXPECT_EQ(b_values.get_element(2), 31);
+    EXPECT_EQ(b_values.get_element(3), 41);
 
     ASSERT_TRUE(reader.close().ok());
     std::filesystem::remove_all(test_dir);
@@ -1197,7 +1226,8 @@
     ASSERT_TRUE(reader.init({
                                     .projected_columns = projected_columns,
                                     .column_predicates = {},
-                                    .conjuncts = {VExprContext::create_shared(
+                                    .conjuncts = {prepared_conjunct(
+                                            &state,
                                             std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2))},
                                     .format = FileFormat::PARQUET,
                                     .scan_params = nullptr,
@@ -1236,8 +1266,10 @@
     projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
 
     TableColumnPredicates column_predicates;
-    column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
-            0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+    add_column_predicate(&column_predicates, projected_columns[0],
+                         create_comparison_predicate<PredicateType::GT>(
+                                 0, "id", std::make_shared<DataTypeInt32>(),
+                                 Field::create_field<TYPE_INT>(2), false));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -1329,7 +1361,8 @@
     ASSERT_TRUE(reader.init({
                                     .projected_columns = projected_columns,
                                     .column_predicates = {},
-                                    .conjuncts = {VExprContext::create_shared(
+                                    .conjuncts = {prepared_conjunct(
+                                            &state,
                                             std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2))},
                                     .format = FileFormat::PARQUET,
                                     .scan_params = nullptr,
@@ -1359,21 +1392,22 @@
     ASSERT_TRUE(reader.close().ok());
 
     TableReader filtered_reader;
-    ASSERT_TRUE(filtered_reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = {VExprContext::create_shared(
-                                        std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4))},
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
-                        .ok());
+    ASSERT_TRUE(
+            filtered_reader
+                    .init({
+                            .projected_columns = projected_columns,
+                            .column_predicates = {},
+                            .conjuncts = {prepared_conjunct(
+                                    &state, std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4))},
+                            .format = FileFormat::PARQUET,
+                            .scan_params = nullptr,
+                            .io_ctx = nullptr,
+                            .runtime_state = &state,
+                            .scanner_profile = nullptr,
+                            .allow_missing_columns = true,
+                            .profile = nullptr,
+                    })
+                    .ok());
     ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok());
 
     block = build_table_block(projected_columns);
@@ -1402,8 +1436,10 @@
     projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
 
     TableColumnPredicates column_predicates;
-    column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
-            0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+    add_column_predicate(&column_predicates, projected_columns[1],
+                         create_comparison_predicate<PredicateType::GT>(
+                                 0, "id", std::make_shared<DataTypeInt32>(),
+                                 Field::create_field<TYPE_INT>(2), false));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -1456,8 +1492,10 @@
     projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
 
     TableColumnPredicates column_predicates;
-    column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
-            0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+    add_column_predicate(&column_predicates, projected_columns[0],
+                         create_comparison_predicate<PredicateType::GT>(
+                                 0, "id", std::make_shared<DataTypeInt32>(),
+                                 Field::create_field<TYPE_INT>(2), false));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -1520,12 +1558,12 @@
     table_filters.push_back({
             .conjunct = VExprContext::create_shared(
                     std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 1)),
-            .slot_ids = {0, 1},
+            .column_unique_ids = {projected_columns[0], projected_columns[1]},
     });
     table_filters.push_back({
             .conjunct = VExprContext::create_shared(
                     std::make_shared<TableInt32SumLessThanExpr>(0, 0, 2, 2, 3)),
-            .slot_ids = {0, 2},
+            .column_unique_ids = {projected_columns[0], projected_columns[2]},
     });
 
     FileScanRequest file_request;
@@ -1570,7 +1608,7 @@
     TableFilter table_filter {
             .conjunct = VExprContext::create_shared(
                     std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
-            .slot_ids = {0},
+            .column_unique_ids = {projected_columns[0]},
     };
 
     FileScanRequest file_request;
@@ -1584,6 +1622,76 @@
     EXPECT_EQ(file_request.column_positions.at(1), 0);
 }
 
+TEST(TableReaderTest, CreateScanRequestUsesColumnNameForByNamePredicateMapping) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const std::vector<TableColumn> projected_columns = {
+            make_table_column(10, "id", int_type),
+            make_table_column(11, "score", int_type),
+    };
+    const std::vector<SchemaField> file_schema = {
+            {.id = 0, .name = "ID", .type = int_type, .children = {}, .column_type = DATA_COLUMN},
+            {.id = 1,
+             .name = "score",
+             .type = int_type,
+             .children = {},
+             .column_type = DATA_COLUMN},
+    };
+
+    TableColumnMapper mapper({.mode = TableColumnMappingMode::BY_NAME});
+    ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok());
+
+    TableFilter table_filter {
+            .conjunct = VExprContext::create_shared(
+                    std::make_shared<TableInt32GreaterThanExpr>(10, 10, 1)),
+            .column_unique_ids = {projected_columns[0]},
+    };
+
+    FileScanRequest file_request;
+    ASSERT_TRUE(
+            mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok());
+
+    EXPECT_EQ(projection_ids(file_request.predicate_columns), std::vector<ColumnId>({0}));
+    EXPECT_EQ(projection_ids(file_request.non_predicate_columns), std::vector<ColumnId>({1}));
+    ASSERT_EQ(file_request.conjuncts.size(), 1);
+    const auto* localized_slot = assert_cast<const TableSlotRef*>(
+            file_request.conjuncts[0]->root()->children()[0].get());
+    EXPECT_EQ(localized_slot->slot_id(), 10);
+    EXPECT_EQ(localized_slot->column_id(), 1);
+}
+
+TEST(TableReaderTest, ColumnPredicateFilterUsesColumnNameForByNameMapping) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const std::vector<TableColumn> projected_columns = {
+            make_table_column(10, "id", int_type),
+            make_table_column(11, "score", int_type),
+    };
+    const std::vector<SchemaField> file_schema = {
+            {.id = 0, .name = "ID", .type = int_type, .children = {}, .column_type = DATA_COLUMN},
+            {.id = 1,
+             .name = "score",
+             .type = int_type,
+             .children = {},
+             .column_type = DATA_COLUMN},
+    };
+
+    TableColumnMapper mapper({.mode = TableColumnMappingMode::BY_NAME});
+    ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok());
+
+    TableColumnPredicates column_predicates;
+    add_column_predicate(&column_predicates, projected_columns[0],
+                         create_comparison_predicate<PredicateType::GT>(
+                                 10, "id", int_type, Field::create_field<TYPE_INT>(2), false));
+
+    FileScanRequest file_request;
+    ASSERT_TRUE(mapper.create_scan_request({}, column_predicates, projected_columns, &file_request)
+                        .ok());
+
+    ASSERT_EQ(file_request.column_predicate_filters.size(), 1);
+    EXPECT_EQ(file_request.column_predicate_filters[0].file_column_id, 0);
+    EXPECT_EQ(projection_ids(file_request.non_predicate_columns), std::vector<ColumnId>({0, 1}));
+    EXPECT_TRUE(file_request.predicate_columns.empty());
+}
+
 TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / "doris_table_reader_multi_conjunct_test";
@@ -1600,22 +1708,21 @@
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(
-            reader
-                    .init({
-                            .projected_columns = projected_columns,
-                            .column_predicates = {},
-                            .conjuncts = {VExprContext::create_shared(
-                                    std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 8))},
-                            .format = FileFormat::PARQUET,
-                            .scan_params = nullptr,
-                            .io_ctx = nullptr,
-                            .runtime_state = &state,
-                            .scanner_profile = nullptr,
-                            .allow_missing_columns = true,
-                            .profile = nullptr,
-                    })
-                    .ok());
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = {prepared_conjunct(
+                                            &state, std::make_shared<TableInt32SumGreaterThanExpr>(
+                                                            0, 0, 1, 1, 8))},
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
 
@@ -1854,7 +1961,8 @@
     ASSERT_TRUE(reader.init({
                                     .projected_columns = projected_columns,
                                     .column_predicates = {},
-                                    .conjuncts = {VExprContext::create_shared(
+                                    .conjuncts = {prepared_conjunct(
+                                            &state,
                                             std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1))},
                                     .format = FileFormat::PARQUET,
                                     .scan_params = nullptr,
@@ -1909,7 +2017,8 @@
     ASSERT_TRUE(reader.init({
                                     .projected_columns = projected_columns,
                                     .column_predicates = {},
-                                    .conjuncts = {VExprContext::create_shared(
+                                    .conjuncts = {prepared_conjunct(
+                                            &state,
                                             std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1))},
                                     .format = FileFormat::PARQUET,
                                     .scan_params = nullptr,
@@ -1962,8 +2071,10 @@
     projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
 
     TableColumnPredicates column_predicates;
-    column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
-            0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+    add_column_predicate(&column_predicates, projected_columns[2],
+                         create_comparison_predicate<PredicateType::GT>(
+                                 0, "id", std::make_shared<DataTypeInt32>(),
+                                 Field::create_field<TYPE_INT>(2), false));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     doris::iceberg::IcebergTableReader reader;
@@ -2699,10 +2810,8 @@
     write_parquet_file(file_path, 7, "seven");
 
     std::vector<TableColumn> projected_columns;
-    projected_columns.push_back(
-            make_table_column(0, "table_id", std::make_shared<DataTypeInt64>()));
-    projected_columns.push_back(
-            make_table_column(1, "table_value", std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt64>()));
+    projected_columns.push_back(make_table_column(1, "value", std::make_shared<DataTypeString>()));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -2722,17 +2831,16 @@
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
 
-    // The table projection is intentionally different from the Parquet schema:
-    // field id 0 is requested as BIGINT instead of the file INT, so ColumnMapper should build a
-    // Cast expression; field id 1 has a different table name but the same type, so it should build
-    // a SlotRef projection. Both columns should still materialize in table schema order.
+    // The table projection requests id as BIGINT instead of the file INT, so ColumnMapper should
+    // build a Cast expression. The second field has the same type and should build a SlotRef
+    // projection. Both columns should still materialize in table schema order.
     Block block = build_table_block(projected_columns);
     bool eos = false;
     ASSERT_TRUE(reader.get_block(&block, &eos).ok());
     ASSERT_FALSE(eos);
 
-    ASSERT_EQ(block.get_by_position(0).name, "table_id");
-    ASSERT_EQ(block.get_by_position(1).name, "table_value");
+    ASSERT_EQ(block.get_by_position(0).name, "id");
+    ASSERT_EQ(block.get_by_position(1).name, "value");
     const auto& id_column = assert_cast<const ColumnInt64&>(*block.get_by_position(0).column);
     const auto& value_column = assert_cast<const ColumnString&>(*block.get_by_position(1).column);
     ASSERT_EQ(id_column.size(), 1);