Fix conjunc (#64111)
diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp
index 5480dee..ed67c16 100644
--- a/be/src/exec/scan/file_scanner_v2.cpp
+++ b/be/src/exec/scan/file_scanner_v2.cpp
@@ -19,12 +19,14 @@
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/ExternalTableSchema_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <algorithm>
#include <charconv>
#include <map>
#include <memory>
+#include <optional>
#include <string>
#include <string_view>
#include <utility>
@@ -115,115 +117,411 @@
return &parent->children.back();
}
-Status add_struct_access_path(reader::TableColumn* column, const DataTypeStruct& struct_type,
- const std::vector<std::string>& path, size_t path_idx);
+const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) {
+ if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) {
+ return nullptr;
+ }
+ return field_ptr.field_ptr.get();
+}
-Status add_access_path(reader::TableColumn* column, const DataTypePtr& type,
- const std::vector<std::string>& path, size_t path_idx) {
- DORIS_CHECK(column != nullptr);
+DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type,
+ const std::string& field_name) {
+ for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
+ if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
+ return struct_type.get_element(field_idx);
+ }
+ }
+ return nullptr;
+}
+
+reader::TableColumn build_schema_column_from_external_field(const schema::external::TField& field,
+ DataTypePtr type) {
+ reader::TableColumn column {
+ .id = field.__isset.id ? field.id : -1,
+ .name = field.__isset.name ? field.name : "",
+ .type = std::move(type),
+ .children = {},
+ .default_expr = nullptr,
+ .is_partition_key = false,
+ };
+ if (column.type == nullptr || !field.__isset.nestedField) {
+ return column;
+ }
+
+ const auto nested_type = remove_nullable(column.type);
+ switch (nested_type->get_primitive_type()) {
+ case TYPE_STRUCT: {
+ if (!field.nestedField.__isset.struct_field ||
+ !field.nestedField.struct_field.__isset.fields) {
+ return column;
+ }
+ const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type);
+ for (const auto& child_ptr : field.nestedField.struct_field.fields) {
+ const auto* child_field = get_field_ptr(child_ptr);
+ if (child_field == nullptr || !child_field->__isset.name) {
+ continue;
+ }
+ auto child_type = find_struct_child_type_by_name(struct_type, child_field->name);
+ if (child_type == nullptr) {
+ continue;
+ }
+ column.children.push_back(
+ build_schema_column_from_external_field(*child_field, child_type));
+ }
+ break;
+ }
+ case TYPE_ARRAY: {
+ if (!field.nestedField.__isset.array_field ||
+ !field.nestedField.array_field.__isset.item_field) {
+ return column;
+ }
+ const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field);
+ if (item_field == nullptr) {
+ return column;
+ }
+ const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
+ column.children.push_back(
+ build_schema_column_from_external_field(*item_field, array_type.get_nested_type()));
+ break;
+ }
+ case TYPE_MAP: {
+ if (!field.nestedField.__isset.map_field ||
+ !field.nestedField.map_field.__isset.key_field ||
+ !field.nestedField.map_field.__isset.value_field) {
+ return column;
+ }
+ const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
+ const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field);
+ if (key_field != nullptr) {
+ column.children.push_back(
+ build_schema_column_from_external_field(*key_field, map_type.get_key_type()));
+ }
+ const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field);
+ if (value_field != nullptr) {
+ column.children.push_back(build_schema_column_from_external_field(
+ *value_field, map_type.get_value_type()));
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ return column;
+}
+
+const reader::TableColumn* find_schema_child_by_path(const reader::TableColumn* schema_column,
+ const std::string& child_path) {
+ if (schema_column == nullptr) {
+ return nullptr;
+ }
+ int32_t parsed_field_id = -1;
+ if (parse_non_negative_int(child_path, &parsed_field_id)) {
+ const auto child_it = std::ranges::find_if(
+ schema_column->children,
+ [&](const reader::TableColumn& child) { return child.id == parsed_field_id; });
+ return child_it == schema_column->children.end() ? nullptr : &*child_it;
+ }
+ const auto child_it = std::ranges::find_if(schema_column->children, [&](const auto& child) {
+ return to_lower(child.name) == to_lower(child_path);
+ });
+ return child_it == schema_column->children.end() ? nullptr : &*child_it;
+}
+
+const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params,
+ const reader::TableColumn& column) {
+ if (params == nullptr || !params->__isset.history_schema_info ||
+ params->history_schema_info.empty()) {
+ return nullptr;
+ }
+ const auto* schema = ¶ms->history_schema_info.front();
+ if (params->__isset.current_schema_id) {
+ for (const auto& candidate_schema : params->history_schema_info) {
+ if (candidate_schema.__isset.schema_id &&
+ candidate_schema.schema_id == params->current_schema_id) {
+ schema = &candidate_schema;
+ break;
+ }
+ }
+ }
+ if (!schema->__isset.root_field || !schema->root_field.__isset.fields) {
+ return nullptr;
+ }
+ for (const auto& field_ptr : schema->root_field.fields) {
+ const auto* field = get_field_ptr(field_ptr);
+ if (field == nullptr) {
+ continue;
+ }
+ if (field->__isset.id && field->id == column.id) {
+ return field;
+ }
+ if (field->__isset.name && to_lower(field->name) == to_lower(column.name)) {
+ return field;
+ }
+ }
+ return nullptr;
+}
+
+struct AccessPathNode {
+ bool project_all = false;
+ std::map<std::string, AccessPathNode> children;
+};
+
+void merge_access_path_node(AccessPathNode* dst, const AccessPathNode& src) {
+ DORIS_CHECK(dst != nullptr);
+ if (dst->project_all) {
+ return;
+ }
+ if (src.project_all) {
+ dst->project_all = true;
+ dst->children.clear();
+ return;
+ }
+ for (const auto& [path, child] : src.children) {
+ merge_access_path_node(&dst->children[path], child);
+ }
+}
+
+void insert_access_path(AccessPathNode* root, const std::vector<std::string>& path,
+ size_t path_idx) {
+ DORIS_CHECK(root != nullptr);
+ if (root->project_all) {
+ return;
+ }
if (path_idx >= path.size()) {
+ root->project_all = true;
+ root->children.clear();
+ return;
+ }
+ insert_access_path(&root->children[path[path_idx]], path, path_idx + 1);
+}
+
+Status build_nested_children_from_access_node(reader::TableColumn* column, const DataTypePtr& type,
+ const AccessPathNode& node, const std::string& path,
+ const reader::TableColumn* schema_column);
+
+Status build_struct_children_from_access_node(reader::TableColumn* column,
+ const DataTypeStruct& struct_type,
+ const AccessPathNode& node, const std::string& path,
+ const reader::TableColumn* schema_column) {
+ DORIS_CHECK(column != nullptr);
+ for (const auto& [child_path, child_node] : node.children) {
+ // Currently we do not support accessing struct children by position (e.g. "col.0") because it can be ambiguous and error-prone when the struct schema evolves. We only support accessing struct children by name (e.g. "col.child"). If needed, we can consider adding support for position-based access in the future with careful design and consideration.
+ if (child_path == "OFFSET" || child_path == "*" || child_path == "KEYS" ||
+ child_path == "VALUES") {
+ return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+ path + "." + child_path, column->name);
+ }
+
+ // Try to find the child field in the schema column first. If not found, fallback to find the child field in the struct type by name (case-insensitive).
+ const auto* schema_child = find_schema_child_by_path(schema_column, child_path);
+ int32_t field_id = schema_child == nullptr ? -1 : schema_child->id;
+ std::string field_name = schema_child == nullptr ? child_path : schema_child->name;
+ DataTypePtr field_type = schema_child == nullptr ? nullptr : schema_child->type;
+ if (schema_child == nullptr) {
+ for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
+ if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
+ field_id = cast_set<int32_t>(field_idx);
+ field_name = struct_type.get_element_name(field_idx);
+ field_type = struct_type.get_element(field_idx);
+ break;
+ }
+ }
+ }
+
+ if (field_id < 0 || field_type == nullptr) {
+ return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+ path + "." + child_path, column->name);
+ }
+ auto* child = find_or_add_child(column, field_id, field_name, field_type);
+ RETURN_IF_ERROR(build_nested_children_from_access_node(
+ child, child->type, child_node, path + "." + child_path, schema_child));
+ }
+ return Status::OK();
+}
+
+Status build_map_children_from_access_node(reader::TableColumn* column, const DataTypeMap& map_type,
+ const AccessPathNode& node, const std::string& path,
+ const reader::TableColumn* schema_column) {
+ DORIS_CHECK(column != nullptr);
+ AccessPathNode key_node;
+ AccessPathNode value_node;
+ bool need_key = false;
+ bool need_value = false;
+
+ for (const auto& [child_path, child_node] : node.children) {
+ if (child_path == "OFFSET") {
+ return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+ path + "." + child_path, column->name);
+ }
+ if (child_path == "KEYS") {
+ need_key = true;
+ merge_access_path_node(&key_node, child_node);
+ continue;
+ }
+ if (child_path == "VALUES") {
+ need_key = true;
+ key_node.project_all = true;
+ key_node.children.clear();
+ need_value = true;
+ merge_access_path_node(&value_node, child_node);
+ continue;
+ }
+ if (child_path == "*") {
+ need_key = true;
+ key_node.project_all = true;
+ key_node.children.clear();
+ need_value = true;
+ merge_access_path_node(&value_node, child_node);
+ continue;
+ }
+ return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+ path + "." + child_path, column->name);
+ }
+ if (need_key && !need_value) {
+ // Keep value readable until the downstream map materialization path can construct a table
+ // Map column from keys only.
+ need_value = true;
+ value_node.project_all = true;
+ value_node.children.clear();
+ }
+
+ DataTypes entry_child_types;
+ Strings entry_child_names;
+ if (need_key) {
+ entry_child_types.push_back(map_type.get_key_type());
+ entry_child_names.push_back("key");
+ }
+ if (need_value) {
+ entry_child_types.push_back(map_type.get_value_type());
+ entry_child_names.push_back("value");
+ }
+ if (entry_child_types.empty()) {
return Status::OK();
}
- if (path[path_idx] == "OFFSET") {
- return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
- access_path_to_string(path), column->name);
+
+ auto entry_type = std::make_shared<DataTypeStruct>(entry_child_types, entry_child_names);
+ auto* entry_child = find_or_add_child(column, 0, "entries", entry_type);
+ const auto* key_schema = schema_column != nullptr && !schema_column->children.empty()
+ ? &schema_column->children[0]
+ : nullptr;
+ const auto* value_schema = schema_column != nullptr && schema_column->children.size() > 1
+ ? &schema_column->children[1]
+ : nullptr;
+ if (need_key) {
+ auto* key_child = find_or_add_child(entry_child, 0, "key", map_type.get_key_type());
+ RETURN_IF_ERROR(build_nested_children_from_access_node(key_child, key_child->type, key_node,
+ path + ".KEYS", key_schema));
+ }
+ if (need_value) {
+ auto* value_child = find_or_add_child(entry_child, 1, "value", map_type.get_value_type());
+ RETURN_IF_ERROR(build_nested_children_from_access_node(
+ value_child, value_child->type, value_node, path + ".VALUES", value_schema));
+ }
+ return Status::OK();
+}
+
+Status build_nested_children_from_access_node(reader::TableColumn* column, const DataTypePtr& type,
+ const AccessPathNode& node, const std::string& path,
+ const reader::TableColumn* schema_column) {
+ DORIS_CHECK(column != nullptr);
+ if (node.project_all || node.children.empty()) {
+ // If project_all is true or there is no specific child path, we need to project all children of the complex type.
+ return Status::OK();
}
const auto nested_type = remove_nullable(type);
switch (nested_type->get_primitive_type()) {
case TYPE_STRUCT:
- return add_struct_access_path(column, assert_cast<const DataTypeStruct&>(*nested_type),
- path, path_idx);
+ return build_struct_children_from_access_node(
+ column, assert_cast<const DataTypeStruct&>(*nested_type), node, path,
+ schema_column);
case TYPE_ARRAY: {
+ if (node.children.size() != 1 || !node.children.contains("*")) {
+ return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
+ path, column->name);
+ }
const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
auto* child = find_or_add_child(column, 0, "element", array_type.get_nested_type());
- return add_access_path(child, child->type, path, path_idx + 1);
+ const auto* element_schema = schema_column != nullptr && !schema_column->children.empty()
+ ? &schema_column->children[0]
+ : nullptr;
+ return build_nested_children_from_access_node(child, child->type, node.children.at("*"),
+ path + ".*", element_schema);
}
- case TYPE_MAP: {
- const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
- if (path[path_idx] == "KEYS") {
- return Status::NotSupported(
- "FileScannerV2 does not support key access path {} for slot {}",
- access_path_to_string(path), column->name);
- }
- if (path[path_idx] == "VALUES" || path[path_idx] == "*") {
- auto entry_type = std::make_shared<DataTypeStruct>(
- DataTypes {map_type.get_value_type()}, Strings {"value"});
- auto* entry_child = find_or_add_child(column, 0, "entries", entry_type);
- auto* value_child =
- find_or_add_child(entry_child, 1, "value", map_type.get_value_type());
- return add_access_path(value_child, value_child->type, path, path_idx + 1);
- }
- return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
- access_path_to_string(path), column->name);
- }
+ case TYPE_MAP:
+ return build_map_children_from_access_node(
+ column, assert_cast<const DataTypeMap&>(*nested_type), node, path, schema_column);
default:
return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
- access_path_to_string(path), column->name);
+ path, column->name);
}
}
-Status add_struct_access_path(reader::TableColumn* column, const DataTypeStruct& struct_type,
- const std::vector<std::string>& path, size_t path_idx) {
- DORIS_CHECK(column != nullptr);
- DORIS_CHECK(path_idx < path.size());
- int32_t field_id = -1;
- std::string field_name = path[path_idx];
- DataTypePtr field_type;
- int32_t parsed_field_id = -1;
- if (parse_non_negative_int(field_name, &parsed_field_id)) {
- field_id = parsed_field_id;
- if (parsed_field_id < static_cast<int32_t>(struct_type.get_elements().size())) {
- field_name = struct_type.get_element_name(parsed_field_id);
- field_type = struct_type.get_element(parsed_field_id);
- }
- } else if (const auto position = struct_type.try_get_position_by_name(field_name)) {
- field_id = cast_set<int32_t>(*position);
- field_type = struct_type.get_element(*position);
- }
-
- if (field_id < 0 || field_type == nullptr) {
- return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
- access_path_to_string(path), column->name);
- }
- auto* child = find_or_add_child(column, field_id, field_name, field_type);
- return add_access_path(child, child->type, path, path_idx + 1);
-}
-
Status build_nested_children_from_access_paths(reader::TableColumn* column,
- const SlotDescriptor* slot_desc) {
+ const TColumnAccessPaths& access_paths,
+ const reader::TableColumn* schema_column) {
DORIS_CHECK(column != nullptr);
- DORIS_CHECK(slot_desc != nullptr);
- if (!is_complex_type(slot_desc->type()->get_primitive_type())) {
- return Status::OK();
- }
- if (slot_desc->all_access_paths().empty()) {
+ if (!is_complex_type(remove_nullable(column->type)->get_primitive_type())) {
+ DCHECK(access_paths.empty());
return Status::OK();
}
- for (const auto& access_path : slot_desc->all_access_paths()) {
+ AccessPathNode root;
+ // Build tree for AccessPathNode.
+ // For example, for access paths ["a.b", "a.c", "d"], the tree will be:
+ // root
+ // ├── a
+ // │ ├── b
+ // │ └── c
+ // └── d
+ for (const auto& access_path : access_paths) {
+ // TODO: Support META access paths if needed. Currently FileScannerV2 only supports DATA access paths.
if (access_path.type != TAccessPathType::DATA || !access_path.__isset.data_access_path) {
return Status::NotSupported("FileScannerV2 only supports DATA access paths for slot {}",
column->name);
}
const auto& path = access_path.data_access_path.path;
if (path.empty()) {
- return Status::NotSupported("FileScannerV2 found empty access path for slot {}",
- column->name);
+ insert_access_path(&root, path, 0);
+ continue;
}
int32_t top_level_id = -1;
- if (path.front() != column->name &&
+ if (to_lower(path.front()) != to_lower(column->name) &&
(!parse_non_negative_int(path.front(), &top_level_id) || top_level_id != column->id)) {
return Status::NotSupported("FileScannerV2 access path {} does not match slot {}",
access_path_to_string(path), column->name);
}
- RETURN_IF_ERROR(add_access_path(column, column->type, path, 1));
+ insert_access_path(&root, path, 1);
}
- return Status::OK();
+ // Recursively build nested children for the column based on the AccessPathNode tree.
+ return build_nested_children_from_access_node(column, column->type, root, column->name,
+ schema_column);
+}
+
+Status build_nested_children_from_access_paths(reader::TableColumn* column,
+ const SlotDescriptor* slot_desc,
+ const reader::TableColumn* schema_column) {
+ DORIS_CHECK(column != nullptr);
+ DORIS_CHECK(slot_desc != nullptr);
+ return build_nested_children_from_access_paths(column, slot_desc->all_access_paths(),
+ schema_column);
}
} // namespace
+#ifdef BE_TEST
+Status FileScannerV2::TEST_build_nested_children_from_access_paths(
+ reader::TableColumn* column, const TColumnAccessPaths& access_paths) {
+ return build_nested_children_from_access_paths(column, access_paths, nullptr);
+}
+
+Status FileScannerV2::TEST_build_nested_children_from_access_paths(
+ reader::TableColumn* column, const TColumnAccessPaths& access_paths,
+ const reader::TableColumn* schema_column) {
+ return build_nested_children_from_access_paths(column, access_paths, schema_column);
+}
+#endif
+
// TODO: Only support parquet format now
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
return get_range_format_type(params, range) == TFileFormatType::FORMAT_PARQUET &&
@@ -344,7 +642,7 @@
.io_ctx = _io_ctx,
.runtime_state = _state,
.scanner_profile = _local_state->scanner_profile(),
- .allow_missing_columns = true, // TODO
+ .allow_missing_columns = false, // TODO
.push_down_agg_type = _local_state->get_push_down_agg_type(),
.profile = nullptr, // TODO
}));
@@ -446,7 +744,14 @@
}
auto column = _build_table_column(it->second);
RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
- RETURN_IF_ERROR(build_nested_children_from_access_paths(&column, it->second));
+ std::optional<reader::TableColumn> schema_column;
+ if (const auto* schema_field = find_external_root_field(_params, column);
+ schema_field != nullptr) {
+ // If the column has a matching root field in the schema, use the schema field to build the column's nested children.
+ schema_column = build_schema_column_from_external_field(*schema_field, column.type);
+ }
+ RETURN_IF_ERROR(build_nested_children_from_access_paths(
+ &column, it->second, schema_column.has_value() ? &*schema_column : nullptr));
if (is_partition_slot(slot_info)) {
column.is_partition_key = true;
_partition_slot_descs.emplace(column.name, it->second);
@@ -491,7 +796,11 @@
if (it == _slot_id_to_desc.end()) {
continue;
}
- (*predicates)[it->second->col_unique_id()] = slot_predicate_list;
+ (*predicates)[it->second->col_unique_id()] = {
+ reader::TableColumn {.id = it->second->col_unique_id(),
+ .name = it->second->col_name(),
+ .type = it->second->get_data_type_ptr()},
+ slot_predicate_list};
}
return Status::OK();
}
diff --git a/be/src/exec/scan/file_scanner_v2.h b/be/src/exec/scan/file_scanner_v2.h
index a633cd7..bf8cbcf 100644
--- a/be/src/exec/scan/file_scanner_v2.h
+++ b/be/src/exec/scan/file_scanner_v2.h
@@ -32,6 +32,7 @@
#include "exprs/vexpr_fwd.h"
#include "format/reader/column_mapper.h"
#include "format/reader/table_reader.h"
+#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "io/io_common.h"
#include "runtime/runtime_profile.h"
@@ -51,6 +52,13 @@
static constexpr const char* NAME = "FileScannerV2";
static bool is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range);
+#ifdef BE_TEST
+ static Status TEST_build_nested_children_from_access_paths(
+ reader::TableColumn* column, const std::vector<TColumnAccessPath>& access_paths);
+ static Status TEST_build_nested_children_from_access_paths(
+ reader::TableColumn* column, const std::vector<TColumnAccessPath>& access_paths,
+ const reader::TableColumn* schema_column);
+#endif
FileScannerV2(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
diff --git a/be/src/exprs/vexpr.h b/be/src/exprs/vexpr.h
index 79f3485..8b7246e 100644
--- a/be/src/exprs/vexpr.h
+++ b/be/src/exprs/vexpr.h
@@ -269,7 +269,7 @@
static ColumnPtr filter_column_with_selector(const ColumnPtr& origin_column,
const Selector* selector, size_t count) {
if (selector == nullptr) {
- DCHECK_EQ(origin_column->size(), count);
+ DCHECK_EQ(origin_column->size(), count) << origin_column->get_name();
return origin_column;
}
DCHECK_EQ(count, selector->size());
diff --git a/be/src/exprs/vslot_ref.cpp b/be/src/exprs/vslot_ref.cpp
index 87aad6b..2904c2a 100644
--- a/be/src/exprs/vslot_ref.cpp
+++ b/be/src/exprs/vslot_ref.cpp
@@ -89,6 +89,7 @@
Status VSlotRef::execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector, size_t count,
ColumnPtr& result_column) const {
+ DCHECK(_open_finished);
if (_column_id >= 0 && _column_id >= block->columns()) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"input block not contain slot column {}, column_id={}, block={}", *_column_name,
diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 9ed8531..5f81b4e 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -58,6 +58,7 @@
int column_id() const { return _column_id; }
MOCK_FUNCTION int slot_id() const { return _slot_id; }
+ int column_uniq_id() const { return _column_uniq_id; }
bool equals(const VExpr& other) override;
diff --git a/be/src/format/reader/column_mapper.cpp b/be/src/format/reader/column_mapper.cpp
index 0e00b22..400437c 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -1262,18 +1262,25 @@
if (mode == TableColumnMappingMode::BY_FIELD_ID && field.id == table_column.id) {
return &field;
}
- if (field.name == table_column.name) {
+ if (to_lower(field.name) == to_lower(table_column.name)) {
return &field;
}
}
return nullptr;
}
-static std::vector<int32_t> filter_slot_ids(const TableFilter& table_filter) {
- if (!table_filter.slot_ids.empty()) {
- return table_filter.slot_ids;
+static const SchemaField* find_file_child_for_complex_wrapper(const TableColumn& table_child,
+ const SchemaField& file_field,
+ TableColumnMappingMode mode) {
+ const auto primitive_type = remove_nullable(file_field.type)->get_primitive_type();
+ if (primitive_type == TYPE_ARRAY || primitive_type == TYPE_MAP) {
+ if (file_field.children.empty()) {
+ return nullptr;
+ }
+ DORIS_CHECK(file_field.children.size() == 1);
+ return &file_field.children[0];
}
- return {};
+ return find_file_child_by_table_column(table_child, file_field.children, mode);
}
Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& projected_columns,
@@ -1381,16 +1388,18 @@
file_request->column_predicate_filters.clear();
// 1. Build referenced non-predicate columns
for (const auto& table_column : projected_columns) {
- auto* mapping = _find_mapping(table_column.id);
+ auto* mapping = _find_mapping(table_column);
if (mapping != nullptr && mapping->field_id.has_value()) {
// A file column can be read lazily as a non-predicate column only when it is not used
// by row-level expression filters. Single-column ColumnPredicate filters are pruning
// hints only and must not force row-level predicate materialization.
bool used_by_filter = false;
for (const auto& table_filter : table_filters) {
- const auto slot_ids = filter_slot_ids(table_filter);
- if (std::find(slot_ids.begin(), slot_ids.end(), table_column.id) !=
- slot_ids.end()) {
+ const auto columns = table_filter.column_unique_ids;
+ if (std::find_if(columns.begin(), columns.end(),
+ [&](const TableColumn& col) -> bool {
+ return table_column.id == col.id;
+ }) != columns.end()) {
used_by_filter = true;
break;
}
@@ -1409,12 +1418,44 @@
continue;
}
auto position_it = file_request->column_positions.find(*mapping.field_id);
- DORIS_CHECK(position_it != file_request->column_positions.end());
+ DORIS_CHECK(position_it != file_request->column_positions.end())
+ << file_request->column_positions.size() << " " << *mapping.field_id << " "
+ << mapping.file_column_name;
rebuild_projection(&mapping, position_it->second);
}
return Status::OK();
}
+ColumnMapping* TableColumnMapper::_find_mapping(const TableColumn& table_column) {
+ for (auto& mapping : _mappings) {
+ if ((_options.mode == TableColumnMappingMode::BY_FIELD_ID ||
+ _options.mode == TableColumnMappingMode::BY_INDEX) &&
+ mapping.table_column_id == table_column.id) {
+ return &mapping;
+ }
+ if (_options.mode == TableColumnMappingMode::BY_NAME &&
+ to_lower(mapping.file_column_name) == to_lower(table_column.name)) {
+ return &mapping;
+ }
+ }
+ return nullptr;
+}
+
+const ColumnMapping* TableColumnMapper::_find_mapping(const TableColumn& table_column) const {
+ for (const auto& mapping : _mappings) {
+ if ((_options.mode == TableColumnMappingMode::BY_FIELD_ID ||
+ _options.mode == TableColumnMappingMode::BY_INDEX) &&
+ mapping.table_column_id == table_column.id) {
+ return &mapping;
+ }
+ if (_options.mode == TableColumnMappingMode::BY_NAME &&
+ to_lower(mapping.file_column_name) == to_lower(table_column.name)) {
+ return &mapping;
+ }
+ }
+ return nullptr;
+}
+
Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table_filters,
const TableColumnPredicates& table_column_predicates,
FileScanRequest* file_request) {
@@ -1423,7 +1464,7 @@
FilterProjectionMap filter_projections;
RETURN_IF_ERROR(build_filter_projection_map(table_filters, &_mappings, &filter_projections));
for (const auto& table_filter : table_filters) {
- for (const auto table_column_id : filter_slot_ids(table_filter)) {
+ for (const auto& table_column_id : table_filter.column_unique_ids) {
auto* mapping = _find_mapping(table_column_id);
if (mapping == nullptr || !mapping->field_id.has_value()) {
continue;
@@ -1445,13 +1486,13 @@
}
}
for (const auto& [table_column_id, predicates] : table_column_predicates) {
- const auto* mapping = _find_mapping(table_column_id);
- if (mapping == nullptr || !mapping->field_id.has_value() || predicates.empty()) {
+ const auto* mapping = _find_mapping(predicates.first);
+ if (mapping == nullptr || !mapping->field_id.has_value() || predicates.second.empty()) {
continue;
}
FileColumnPredicateFilter column_predicate_filter;
column_predicate_filter.file_column_id = *mapping->field_id;
- column_predicate_filter.predicates = predicates;
+ column_predicate_filter.predicates = predicates.second;
file_request->column_predicate_filters.push_back(std::move(column_predicate_filter));
}
for (const auto& table_filter : table_filters) {
@@ -1477,7 +1518,7 @@
return &field;
}
}
- if (field.name == table_column.name) {
+ if (to_lower(field.name) == to_lower(table_column.name)) {
return &field;
}
}
@@ -1501,8 +1542,8 @@
if (!table_column.children.empty()) {
for (const auto& table_child : table_column.children) {
- const auto* file_child = find_file_child_by_table_column(
- table_child, file_field.children, _options.mode);
+ const auto* file_child =
+ find_file_child_for_complex_wrapper(table_child, file_field, _options.mode);
if (file_child == nullptr) {
if (!_options.allow_missing_columns) {
return Status::InvalidArgument(
diff --git a/be/src/format/reader/column_mapper.h b/be/src/format/reader/column_mapper.h
index 78ca20d..abf216f 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -41,7 +41,8 @@
struct TableColumn;
struct TableFilter;
-using TableColumnPredicates = std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
+using TableColumnPredicates =
+ std::map<int32_t, std::pair<TableColumn, std::vector<std::shared_ptr<ColumnPredicate>>>>;
enum class TableColumnMappingMode {
BY_FIELD_ID,
@@ -138,23 +139,8 @@
const std::vector<SchemaField>& file_schema,
ColumnMapping* mapping) const;
- ColumnMapping* _find_mapping(int32_t table_column_id) {
- for (auto& mapping : _mappings) {
- if (mapping.table_column_id == table_column_id) {
- return &mapping;
- }
- }
- return nullptr;
- }
-
- const ColumnMapping* _find_mapping(int32_t table_column_id) const {
- for (const auto& mapping : _mappings) {
- if (mapping.table_column_id == table_column_id) {
- return &mapping;
- }
- }
- return nullptr;
- }
+ ColumnMapping* _find_mapping(const TableColumn& table_column);
+ const ColumnMapping* _find_mapping(const TableColumn& table_column) const;
bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& file_type) const {
DORIS_CHECK(table_type != nullptr);
diff --git a/be/src/format/reader/table/hive_reader.cpp b/be/src/format/reader/table/hive_reader.cpp
index 5b19779..ec3239d 100644
--- a/be/src/format/reader/table/hive_reader.cpp
+++ b/be/src/format/reader/table/hive_reader.cpp
@@ -56,11 +56,9 @@
break;
}
- reader::TableColumnMapperOptions mapper_options;
- mapper_options.mode = use_column_names ? reader::TableColumnMappingMode::BY_NAME
+ _mapper_options.mode = use_column_names ? reader::TableColumnMappingMode::BY_NAME
: reader::TableColumnMappingMode::BY_INDEX;
- mapper_options.allow_missing_columns = allow_missing_columns;
- _data_reader.column_mapper = reader::TableColumnMapper(mapper_options);
+ _mapper_options.allow_missing_columns = allow_missing_columns;
return Status::OK();
}
diff --git a/be/src/format/reader/table_reader.cpp b/be/src/format/reader/table_reader.cpp
index 2aebcad..3798a7e 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -39,16 +39,20 @@
namespace doris::reader {
namespace {
-void collect_table_slot_ids(const VExprSPtr& expr, std::set<int>* slot_ids) {
+void collect_table_column_unique_ids(const VExprSPtr& expr,
+ std::map<ColumnId, TableColumn>* column_unique_ids) {
if (expr == nullptr) {
return;
}
if (expr->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
- slot_ids->insert(slot_ref->slot_id());
+ column_unique_ids->insert(
+ {slot_ref->column_uniq_id(), TableColumn {.id = slot_ref->column_uniq_id(),
+ .name = slot_ref->column_name(),
+ .type = slot_ref->data_type()}});
}
for (const auto& child : expr->children()) {
- collect_table_slot_ids(child, slot_ids);
+ collect_table_column_unique_ids(child, column_unique_ids);
}
}
@@ -57,13 +61,15 @@
if (conjunct == nullptr) {
return Status::OK();
}
- std::set<int> slot_ids;
- collect_table_slot_ids(conjunct->root(), &slot_ids);
- if (!slot_ids.empty()) {
+ std::map<ColumnId, TableColumn> columns;
+ collect_table_column_unique_ids(conjunct->root(), &columns);
+ if (!columns.empty()) {
TableFilter table_filter;
table_filter.conjunct = nullptr;
RETURN_IF_ERROR(conjunct->clone(state, table_filter.conjunct));
- table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end());
+ for (const auto& [column_id, column] : columns) {
+ table_filter.column_unique_ids.push_back(column);
+ }
table_filters->push_back(std::move(table_filter));
}
return Status::OK();
@@ -152,10 +158,8 @@
_projected_columns = std::move(options.projected_columns);
_system_properties = create_system_properties(_scan_params);
_profile = std::move(options.profile);
- TableColumnMapperOptions mapper_options;
- mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
- mapper_options.allow_missing_columns = options.allow_missing_columns;
- _data_reader.column_mapper = TableColumnMapper(mapper_options);
+ _mapper_options.mode = TableColumnMappingMode::BY_NAME;
+ _mapper_options.allow_missing_columns = options.allow_missing_columns;
_conjuncts = std::move(options.conjuncts);
_table_column_predicates = std::move(options.column_predicates);
return Status::OK();
@@ -224,6 +228,7 @@
}
Status TableReader::prepare_split(const SplitReadOptions& options) {
+ _data_reader.column_mapper = TableColumnMapper(_mapper_options);
_partition_values = std::move(options.partition_values);
_current_task = std::make_unique<ScanTask>();
_current_task->data_file = create_file_description(options.current_range);
diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h
index 9eb3215..6df2b8d 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -67,23 +67,19 @@
// table/global schema 中的列视图。
// Iceberg 场景下,id 默认对应 Iceberg field id。该结构不描述文件中的物理列。
struct TableColumn {
- ColumnId id = -1;
+ ColumnId id = -1; // column_unique_id
std::string name;
DataTypePtr type;
- std::vector<TableColumn> children;
- VExprContextSPtr default_expr;
+ std::vector<TableColumn> children {};
+ VExprContextSPtr default_expr = nullptr;
bool is_partition_key = false;
};
// Row-level predicates on table/global schema. They are rewritten to file-local expressions when
// possible, and remain the source of row-level filtering after localization.
struct TableFilter {
- // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
VExprContextSPtr conjunct;
-
- // Table slot ids referenced by conjunct. A single expression filter may depend on multiple
- // columns, while ColumnPredicate pruning still belongs to one concrete column.
- std::vector<int32_t> slot_ids;
+ std::vector<TableColumn> column_unique_ids;
};
enum class TableFilterConversion {
@@ -809,6 +805,7 @@
FileFormat _format;
TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
bool _aggregate_pushdown_tried = false;
+ TableColumnMapperOptions _mapper_options;
private:
static const SchemaField* _find_schema_field(const std::vector<SchemaField>& schema,
diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h
index 32b4496..48aa652 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -45,6 +45,11 @@
class IcebergTableReader : public reader::TableReader {
public:
~IcebergTableReader() override = default;
+ Status init(reader::TableReadOptions&& options) override {
+ RETURN_IF_ERROR(reader::TableReader::init(std::move(options)));
+ _mapper_options.mode = reader::TableColumnMappingMode::BY_FIELD_ID;
+ return Status::OK();
+ }
Status prepare_split(const reader::SplitReadOptions& options) override;
diff --git a/be/test/exec/scan/vfile_scanner_exception_test.cpp b/be/test/exec/scan/vfile_scanner_exception_test.cpp
index 912e3e1..4915411 100644
--- a/be/test/exec/scan/vfile_scanner_exception_test.cpp
+++ b/be/test/exec/scan/vfile_scanner_exception_test.cpp
@@ -18,10 +18,17 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gtest/gtest.h>
+#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "common/object_pool.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
#include "cpp/sync_point.h"
#include "exec/operator/file_scan_operator.h"
#include "exec/scan/file_scanner.h"
@@ -36,6 +43,18 @@
#include "runtime/user_function_cache.h"
namespace doris {
+namespace {
+
+TColumnAccessPath data_access_path(std::vector<std::string> path) {
+ TColumnAccessPath access_path;
+ access_path.__set_type(TAccessPathType::DATA);
+ TDataAccessPath data_path;
+ data_path.__set_path(std::move(path));
+ access_path.__set_data_access_path(std::move(data_path));
+ return access_path;
+}
+
+} // namespace
class TestSplitSourceConnectorStub : public SplitSourceConnector {
private:
@@ -378,4 +397,107 @@
EXPECT_TRUE(supported_split_source.all_scan_ranges_match(params, FileScannerV2::is_supported));
}
+TEST(FileScannerV2Test, BuildNestedChildrenFromAccessPaths) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const auto key_type = std::make_shared<DataTypeString>();
+ const auto value_type =
+ std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"b", "c"});
+ reader::TableColumn column {
+ .id = 100, .name = "m", .type = std::make_shared<DataTypeMap>(key_type, value_type)};
+
+ std::vector<TColumnAccessPath> access_paths;
+ access_paths.push_back(data_access_path({"m", "KEYS"}));
+ access_paths.push_back(data_access_path({"m", "VALUES", "b"}));
+ access_paths.push_back(data_access_path({"m", "*", "c"}));
+ auto status =
+ FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths);
+ ASSERT_TRUE(status.ok()) << status;
+
+ ASSERT_EQ(column.children.size(), 1);
+ const auto& entries = column.children[0];
+ EXPECT_EQ(entries.id, 0);
+ EXPECT_EQ(entries.name, "entries");
+ ASSERT_EQ(entries.children.size(), 2);
+ EXPECT_EQ(entries.children[0].name, "key");
+ EXPECT_TRUE(entries.children[0].children.empty());
+ EXPECT_EQ(entries.children[1].name, "value");
+ ASSERT_EQ(entries.children[1].children.size(), 2);
+ EXPECT_EQ(entries.children[1].children[0].name, "b");
+ EXPECT_EQ(entries.children[1].children[1].name, "c");
+}
+
+TEST(FileScannerV2Test, BuildArrayStructChildrenFromAccessPaths) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const auto element_type =
+ std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"a", "b"});
+ reader::TableColumn column {
+ .id = 100,
+ .name = "arr",
+ .type = std::make_shared<DataTypeArray>(element_type),
+ };
+
+ std::vector<TColumnAccessPath> access_paths;
+ access_paths.push_back(data_access_path({"arr", "*", "a"}));
+ auto status =
+ FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths);
+ ASSERT_TRUE(status.ok()) << status;
+
+ ASSERT_EQ(column.children.size(), 1);
+ const auto& element = column.children[0];
+ EXPECT_EQ(element.id, 0);
+ EXPECT_EQ(element.name, "element");
+ ASSERT_EQ(element.children.size(), 1);
+ EXPECT_EQ(element.children[0].id, 0);
+ EXPECT_EQ(element.children[0].name, "a");
+}
+
+TEST(FileScannerV2Test, BuildStructChildrenFromFieldIdAccessPaths) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const auto struct_type =
+ std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"a", "b"});
+ reader::TableColumn column {
+ .id = 100,
+ .name = "s",
+ .type = struct_type,
+ };
+ reader::TableColumn schema_column {
+ .id = 100,
+ .name = "s",
+ .type = struct_type,
+ .children =
+ {
+ {.id = 101, .name = "a", .type = int_type},
+ {.id = 205, .name = "b", .type = int_type},
+ },
+ };
+
+ std::vector<TColumnAccessPath> access_paths;
+ access_paths.push_back(data_access_path({"100", "205"}));
+ auto status = FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths,
+ &schema_column);
+ ASSERT_TRUE(status.ok()) << status;
+
+ ASSERT_EQ(column.children.size(), 1);
+ EXPECT_EQ(column.children[0].id, 205);
+ EXPECT_EQ(column.children[0].name, "b");
+}
+
+TEST(FileScannerV2Test, BuildNestedChildrenKeepsTopLevelProjectionWhole) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ reader::TableColumn column {
+ .id = 100,
+ .name = "s",
+ .type = std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type},
+ Strings {"a", "b"}),
+ };
+
+ std::vector<TColumnAccessPath> access_paths;
+ access_paths.push_back(data_access_path({"s"}));
+ access_paths.push_back(data_access_path({"s", "a"}));
+ auto status =
+ FileScannerV2::TEST_build_nested_children_from_access_paths(&column, access_paths);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_TRUE(column.children.empty());
+}
+
} // namespace doris
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp b/be/test/format/new_parquet/parquet_reader_test.cpp
index 260e656..15e57e8 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -705,7 +705,7 @@
filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field<TYPE_INT>(5)));
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -770,7 +770,7 @@
filter_expr->add_child(TableLiteral::create_shared(id_type, Field::create_field<TYPE_INT>(5)));
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -832,7 +832,7 @@
a_type, {Field::create_field<TYPE_INT>(5), Field::create_field<TYPE_INT>(7)});
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -887,7 +887,7 @@
TableSlotRef::create_shared(100, 100, -1, full_table_struct_type, "s"), a_type, "a"));
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -956,7 +956,7 @@
{Field::create_field<TYPE_INT>(5), Field::create_field<TYPE_INT>(7)});
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -1012,7 +1012,7 @@
filter_expr->add_child(TableLiteral::create_shared(a_type, Field::create_field<TYPE_INT>(5)));
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -1073,7 +1073,7 @@
filter_expr->add_child(right);
reader::TableFilter table_filter {
.conjunct = VExprContext::create_shared(filter_expr),
- .slot_ids = {100},
+ .column_unique_ids = {table_column},
};
reader::TableColumnMapperOptions options;
@@ -1200,8 +1200,10 @@
ASSERT_TRUE(mapper.create_mapping({table_id, table_value}, {}, {id_field, value_field}).ok());
reader::TableColumnPredicates column_predicates;
- column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
- 0, "id", id_field.type, Field::create_field<TYPE_INT>(2), false));
+ column_predicates[0] = {
+ table_id,
+ {create_comparison_predicate<PredicateType::GT>(
+ 0, "id", id_field.type, Field::create_field<TYPE_INT>(2), false)}};
auto request = std::make_unique<reader::FileScanRequest>();
ASSERT_TRUE(mapper.create_scan_request({}, column_predicates, {table_id, table_value},
diff --git a/be/test/format/reader/expr/cast_test.cpp b/be/test/format/reader/expr/cast_test.cpp
index 24491d4..f2c814a 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -340,7 +340,7 @@
predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -400,7 +400,7 @@
TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(15)));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -459,7 +459,7 @@
predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -519,7 +519,7 @@
TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(22)));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -563,7 +563,7 @@
Field::create_field<TYPE_STRING>("bad")));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -600,7 +600,7 @@
TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(22)));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::SchemaField int_file_field;
int_file_field.id = 0;
@@ -665,7 +665,7 @@
Field::create_field<TYPE_STRING>("bad")));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -698,7 +698,7 @@
TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(15)));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::SchemaField int_file_field;
int_file_field.id = 0;
@@ -761,7 +761,7 @@
TableLiteral::create_shared(table_column.type, Field::create_field<TYPE_BIGINT>(15)));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest file_request;
ASSERT_TRUE(
@@ -801,7 +801,7 @@
predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest first_request;
ASSERT_TRUE(
@@ -832,7 +832,7 @@
predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::SchemaField int_file_field;
int_file_field.id = 0;
@@ -905,7 +905,7 @@
predicate->add_child(TableSlotRef::create_shared(7, 7, -1, table_column.type, "value"));
reader::TableFilter table_filter;
table_filter.conjunct = VExprContext::create_shared(predicate);
- table_filter.slot_ids = {7};
+ table_filter.column_unique_ids = {table_column};
reader::FileScanRequest first_request;
ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, &first_request).ok());
diff --git a/be/test/format/reader/table_reader_test.cpp b/be/test/format/reader/table_reader_test.cpp
index 3685568..b8b83a6 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -77,7 +77,7 @@
public:
TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value)
: VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
- add_child(TableSlotRef::create_shared(slot_id, column_id, -1,
+ add_child(TableSlotRef::create_shared(slot_id, column_id, slot_id,
std::make_shared<DataTypeInt32>(), "id"));
set_node_type(TExprNodeType::BINARY_PRED);
_opcode = TExprOpcode::GT;
@@ -163,9 +163,9 @@
TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int right_slot_id,
int right_column_id, int32_t value)
: VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
- add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+ add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, left_slot_id,
std::make_shared<DataTypeInt32>(), "id"));
- add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, -1,
+ add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, right_slot_id,
std::make_shared<DataTypeInt32>(), "score"));
set_node_type(TExprNodeType::BINARY_PRED);
_opcode = TExprOpcode::GT;
@@ -203,9 +203,9 @@
TableInt32SumLessThanExpr(int left_slot_id, int left_column_id, int right_slot_id,
int right_column_id, int32_t value)
: VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
- add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1,
+ add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, left_slot_id,
std::make_shared<DataTypeInt32>(), "id"));
- add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, -1,
+ add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, right_slot_id,
std::make_shared<DataTypeInt32>(), "score"));
set_node_type(TExprNodeType::BINARY_PRED);
_opcode = TExprOpcode::LT;
@@ -389,33 +389,36 @@
void write_list_struct_parquet_file(const std::string& file_path) {
auto struct_type = arrow::struct_(
- {arrow::field("a", arrow::int32(), false), arrow::field("b", arrow::utf8(), false)});
+ {arrow::field("a", arrow::int32(), false), arrow::field("b", arrow::int32(), false)});
std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;
auto a_array_builder = std::make_unique<arrow::Int32Builder>();
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder)));
- auto b_array_builder = std::make_unique<arrow::StringBuilder>();
+ auto b_array_builder = std::make_unique<arrow::Int32Builder>();
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder)));
auto struct_builder = std::make_shared<arrow::StructBuilder>(
struct_type, arrow::default_memory_pool(), std::move(field_builders));
- auto list_type = arrow::list(arrow::field("element", struct_type, false));
+ auto list_type = arrow::list(arrow::field("element", struct_type, true));
arrow::ListBuilder builder(arrow::default_memory_pool(), struct_builder, list_type);
auto* a_builder = assert_cast<arrow::Int32Builder*>(struct_builder->field_builder(0));
- auto* b_builder = assert_cast<arrow::StringBuilder*>(struct_builder->field_builder(1));
+ auto* b_builder = assert_cast<arrow::Int32Builder*>(struct_builder->field_builder(1));
EXPECT_TRUE(builder.Append().ok());
EXPECT_TRUE(struct_builder->Append().ok());
EXPECT_TRUE(a_builder->Append(10).ok());
- EXPECT_TRUE(b_builder->Append("la").ok());
+ EXPECT_TRUE(b_builder->Append(11).ok());
EXPECT_TRUE(struct_builder->Append().ok());
EXPECT_TRUE(a_builder->Append(20).ok());
- EXPECT_TRUE(b_builder->Append("lb").ok());
+ EXPECT_TRUE(b_builder->Append(21).ok());
EXPECT_TRUE(builder.Append().ok());
EXPECT_TRUE(struct_builder->Append().ok());
EXPECT_TRUE(a_builder->Append(30).ok());
- EXPECT_TRUE(b_builder->Append("lc").ok());
+ EXPECT_TRUE(b_builder->Append(31).ok());
- EXPECT_TRUE(builder.AppendEmptyValue().ok());
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(struct_builder->Append().ok());
+ EXPECT_TRUE(a_builder->Append(40).ok());
+ EXPECT_TRUE(b_builder->Append(41).ok());
auto schema = arrow::schema({
arrow::field("xs", list_type, false),
@@ -725,6 +728,22 @@
return field;
}
+void add_column_predicate(TableColumnPredicates* column_predicates, const TableColumn& column,
+ std::shared_ptr<ColumnPredicate> predicate) {
+ auto& entry = (*column_predicates)[column.id];
+ entry.first = column;
+ entry.second.push_back(std::move(predicate));
+}
+
+VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) {
+ auto ctx = VExprContext::create_shared(expr);
+ auto status = ctx->prepare(state, RowDescriptor());
+ EXPECT_TRUE(status.ok()) << status;
+ status = ctx->open(state);
+ EXPECT_TRUE(status.ok()) << status;
+ return ctx;
+}
+
TEST(TableReaderTest, ReopenSplitAfterClose) {
const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_test";
std::filesystem::remove_all(test_dir);
@@ -748,7 +767,8 @@
ASSERT_TRUE(reader.init({
.projected_columns = projected_columns,
.column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
+ .conjuncts = {prepared_conjunct(
+ &state,
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0))},
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -984,13 +1004,10 @@
const auto file_path = (test_dir / "split.parquet").string();
write_list_struct_parquet_file(file_path);
- const auto string_type = std::make_shared<DataTypeString>();
- auto b_child = make_table_column(1, "b", string_type);
- auto element_type = std::make_shared<DataTypeStruct>(DataTypes {string_type}, Strings {"b"});
- auto element_child = make_table_column(0, "element", element_type);
- element_child.children = {b_child};
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ auto element_type =
+ std::make_shared<DataTypeStruct>(DataTypes {int_type, int_type}, Strings {"a", "b"});
auto list_column = make_table_column(100, "xs", std::make_shared<DataTypeArray>(element_type));
- list_column.children = {element_child};
const std::vector<TableColumn> projected_columns = {list_column};
RuntimeState state {TQueryOptions(), TQueryGlobals()};
@@ -1013,19 +1030,31 @@
Block block = build_table_block(projected_columns);
bool eos = false;
- ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ auto status = reader.get_block(&block, &eos);
+ ASSERT_TRUE(status.ok()) << status;
ASSERT_FALSE(eos);
ASSERT_EQ(block.rows(), 3);
const auto& array_result = assert_cast<const ColumnArray&>(*block.get_by_position(0).column);
EXPECT_EQ(array_result.get_offsets()[0], 2);
EXPECT_EQ(array_result.get_offsets()[1], 3);
- EXPECT_EQ(array_result.get_offsets()[2], 3);
- const auto& element_struct = assert_cast<const ColumnStruct&>(array_result.get_data());
- ASSERT_EQ(element_struct.get_columns().size(), 1);
- const auto& b_values = assert_cast<const ColumnString&>(element_struct.get_column(0));
- EXPECT_EQ(b_values.get_data_at(0).to_string(), "la");
- EXPECT_EQ(b_values.get_data_at(1).to_string(), "lb");
- EXPECT_EQ(b_values.get_data_at(2).to_string(), "lc");
+ EXPECT_EQ(array_result.get_offsets()[2], 4);
+ const auto& nullable_elements = assert_cast<const ColumnNullable&>(array_result.get_data());
+ for (const auto is_null : nullable_elements.get_null_map_data()) {
+ EXPECT_EQ(is_null, 0);
+ }
+ const auto& element_struct =
+ assert_cast<const ColumnStruct&>(nullable_elements.get_nested_column());
+ ASSERT_EQ(element_struct.get_columns().size(), 2);
+ const auto& a_values = assert_cast<const ColumnInt32&>(element_struct.get_column(0));
+ EXPECT_EQ(a_values.get_element(0), 10);
+ EXPECT_EQ(a_values.get_element(1), 20);
+ EXPECT_EQ(a_values.get_element(2), 30);
+ EXPECT_EQ(a_values.get_element(3), 40);
+ const auto& b_values = assert_cast<const ColumnInt32&>(element_struct.get_column(1));
+ EXPECT_EQ(b_values.get_element(0), 11);
+ EXPECT_EQ(b_values.get_element(1), 21);
+ EXPECT_EQ(b_values.get_element(2), 31);
+ EXPECT_EQ(b_values.get_element(3), 41);
ASSERT_TRUE(reader.close().ok());
std::filesystem::remove_all(test_dir);
@@ -1197,7 +1226,8 @@
ASSERT_TRUE(reader.init({
.projected_columns = projected_columns,
.column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
+ .conjuncts = {prepared_conjunct(
+ &state,
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2))},
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -1236,8 +1266,10 @@
projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
TableColumnPredicates column_predicates;
- column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
- 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+ add_column_predicate(&column_predicates, projected_columns[0],
+ create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
+ Field::create_field<TYPE_INT>(2), false));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -1329,7 +1361,8 @@
ASSERT_TRUE(reader.init({
.projected_columns = projected_columns,
.column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
+ .conjuncts = {prepared_conjunct(
+ &state,
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2))},
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -1359,21 +1392,22 @@
ASSERT_TRUE(reader.close().ok());
TableReader filtered_reader;
- ASSERT_TRUE(filtered_reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
- std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4))},
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
- .ok());
+ ASSERT_TRUE(
+ filtered_reader
+ .init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = {prepared_conjunct(
+ &state, std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4))},
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok());
block = build_table_block(projected_columns);
@@ -1402,8 +1436,10 @@
projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
TableColumnPredicates column_predicates;
- column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
- 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+ add_column_predicate(&column_predicates, projected_columns[1],
+ create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
+ Field::create_field<TYPE_INT>(2), false));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -1456,8 +1492,10 @@
projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
TableColumnPredicates column_predicates;
- column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
- 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+ add_column_predicate(&column_predicates, projected_columns[0],
+ create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
+ Field::create_field<TYPE_INT>(2), false));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -1520,12 +1558,12 @@
table_filters.push_back({
.conjunct = VExprContext::create_shared(
std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 1)),
- .slot_ids = {0, 1},
+ .column_unique_ids = {projected_columns[0], projected_columns[1]},
});
table_filters.push_back({
.conjunct = VExprContext::create_shared(
std::make_shared<TableInt32SumLessThanExpr>(0, 0, 2, 2, 3)),
- .slot_ids = {0, 2},
+ .column_unique_ids = {projected_columns[0], projected_columns[2]},
});
FileScanRequest file_request;
@@ -1570,7 +1608,7 @@
TableFilter table_filter {
.conjunct = VExprContext::create_shared(
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
- .slot_ids = {0},
+ .column_unique_ids = {projected_columns[0]},
};
FileScanRequest file_request;
@@ -1584,6 +1622,76 @@
EXPECT_EQ(file_request.column_positions.at(1), 0);
}
+TEST(TableReaderTest, CreateScanRequestUsesColumnNameForByNamePredicateMapping) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const std::vector<TableColumn> projected_columns = {
+ make_table_column(10, "id", int_type),
+ make_table_column(11, "score", int_type),
+ };
+ const std::vector<SchemaField> file_schema = {
+ {.id = 0, .name = "ID", .type = int_type, .children = {}, .column_type = DATA_COLUMN},
+ {.id = 1,
+ .name = "score",
+ .type = int_type,
+ .children = {},
+ .column_type = DATA_COLUMN},
+ };
+
+ TableColumnMapper mapper({.mode = TableColumnMappingMode::BY_NAME});
+ ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok());
+
+ TableFilter table_filter {
+ .conjunct = VExprContext::create_shared(
+ std::make_shared<TableInt32GreaterThanExpr>(10, 10, 1)),
+ .column_unique_ids = {projected_columns[0]},
+ };
+
+ FileScanRequest file_request;
+ ASSERT_TRUE(
+ mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok());
+
+ EXPECT_EQ(projection_ids(file_request.predicate_columns), std::vector<ColumnId>({0}));
+ EXPECT_EQ(projection_ids(file_request.non_predicate_columns), std::vector<ColumnId>({1}));
+ ASSERT_EQ(file_request.conjuncts.size(), 1);
+ const auto* localized_slot = assert_cast<const TableSlotRef*>(
+ file_request.conjuncts[0]->root()->children()[0].get());
+ EXPECT_EQ(localized_slot->slot_id(), 10);
+ EXPECT_EQ(localized_slot->column_id(), 1);
+}
+
+TEST(TableReaderTest, ColumnPredicateFilterUsesColumnNameForByNameMapping) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const std::vector<TableColumn> projected_columns = {
+ make_table_column(10, "id", int_type),
+ make_table_column(11, "score", int_type),
+ };
+ const std::vector<SchemaField> file_schema = {
+ {.id = 0, .name = "ID", .type = int_type, .children = {}, .column_type = DATA_COLUMN},
+ {.id = 1,
+ .name = "score",
+ .type = int_type,
+ .children = {},
+ .column_type = DATA_COLUMN},
+ };
+
+ TableColumnMapper mapper({.mode = TableColumnMappingMode::BY_NAME});
+ ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok());
+
+ TableColumnPredicates column_predicates;
+ add_column_predicate(&column_predicates, projected_columns[0],
+ create_comparison_predicate<PredicateType::GT>(
+ 10, "id", int_type, Field::create_field<TYPE_INT>(2), false));
+
+ FileScanRequest file_request;
+ ASSERT_TRUE(mapper.create_scan_request({}, column_predicates, projected_columns, &file_request)
+ .ok());
+
+ ASSERT_EQ(file_request.column_predicate_filters.size(), 1);
+ EXPECT_EQ(file_request.column_predicate_filters[0].file_column_id, 0);
+ EXPECT_EQ(projection_ids(file_request.non_predicate_columns), std::vector<ColumnId>({0, 1}));
+ EXPECT_TRUE(file_request.predicate_columns.empty());
+}
+
TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) {
const auto test_dir =
std::filesystem::temp_directory_path() / "doris_table_reader_multi_conjunct_test";
@@ -1600,22 +1708,21 @@
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(
- reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
- std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 8))},
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
- .ok());
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = {prepared_conjunct(
+ &state, std::make_shared<TableInt32SumGreaterThanExpr>(
+ 0, 0, 1, 1, 8))},
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -1854,7 +1961,8 @@
ASSERT_TRUE(reader.init({
.projected_columns = projected_columns,
.column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
+ .conjuncts = {prepared_conjunct(
+ &state,
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1))},
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -1909,7 +2017,8 @@
ASSERT_TRUE(reader.init({
.projected_columns = projected_columns,
.column_predicates = {},
- .conjuncts = {VExprContext::create_shared(
+ .conjuncts = {prepared_conjunct(
+ &state,
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1))},
.format = FileFormat::PARQUET,
.scan_params = nullptr,
@@ -1962,8 +2071,10 @@
projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>()));
TableColumnPredicates column_predicates;
- column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
- 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false));
+ add_column_predicate(&column_predicates, projected_columns[2],
+ create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
+ Field::create_field<TYPE_INT>(2), false));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
doris::iceberg::IcebergTableReader reader;
@@ -2699,10 +2810,8 @@
write_parquet_file(file_path, 7, "seven");
std::vector<TableColumn> projected_columns;
- projected_columns.push_back(
- make_table_column(0, "table_id", std::make_shared<DataTypeInt64>()));
- projected_columns.push_back(
- make_table_column(1, "table_value", std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt64>()));
+ projected_columns.push_back(make_table_column(1, "value", std::make_shared<DataTypeString>()));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -2722,17 +2831,16 @@
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
- // The table projection is intentionally different from the Parquet schema:
- // field id 0 is requested as BIGINT instead of the file INT, so ColumnMapper should build a
- // Cast expression; field id 1 has a different table name but the same type, so it should build
- // a SlotRef projection. Both columns should still materialize in table schema order.
+ // The table projection requests id as BIGINT instead of the file INT, so ColumnMapper should
+ // build a Cast expression. The second field has the same type and should build a SlotRef
+ // projection. Both columns should still materialize in table schema order.
Block block = build_table_block(projected_columns);
bool eos = false;
ASSERT_TRUE(reader.get_block(&block, &eos).ok());
ASSERT_FALSE(eos);
- ASSERT_EQ(block.get_by_position(0).name, "table_id");
- ASSERT_EQ(block.get_by_position(1).name, "table_value");
+ ASSERT_EQ(block.get_by_position(0).name, "id");
+ ASSERT_EQ(block.get_by_position(1).name, "value");
const auto& id_column = assert_cast<const ColumnInt64&>(*block.get_by_position(0).column);
const auto& value_column = assert_cast<const ColumnString&>(*block.get_by_position(1).column);
ASSERT_EQ(id_column.size(), 1);