| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <string> |
| |
| #include "common/status.h" |
| #include "core/block/block.h" |
| #include "core/data_type/data_type_array.h" |
| #include "core/data_type/data_type_map.h" |
| #include "core/data_type/data_type_struct.h" |
| #include "format/generic_reader.h" |
| #include "format/parquet/schema_desc.h" |
| #include "runtime/runtime_profile.h" |
| #include "runtime/runtime_state.h" |
| #include "storage/olap_scan_common.h" |
| #include "util/string_util.h" |
| |
| namespace doris { |
| class TFileRangeDesc; |
| class Block; |
| } // namespace doris |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class TableFormatReader : public GenericReader { |
| public: |
| TableFormatReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeState* state, |
| RuntimeProfile* profile, const TFileScanRangeParams& params, |
| const TFileRangeDesc& range, io::IOContext* io_ctx, FileMetaCache* meta_cache) |
| : _file_format_reader(std::move(file_format_reader)), |
| _state(state), |
| _profile(profile), |
| _params(params), |
| _range(range), |
| _io_ctx(io_ctx) { |
| _meta_cache = meta_cache; |
| if (range.table_format_params.__isset.table_level_row_count) { |
| _table_level_row_count = range.table_format_params.table_level_row_count; |
| } else { |
| _table_level_row_count = -1; |
| } |
| } |
| ~TableFormatReader() override = default; |
| Status get_next_block(Block* block, size_t* read_rows, bool* eof) final { |
| if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count >= 0) { |
| auto rows = |
| std::min(_table_level_row_count, (int64_t)_state->query_options().batch_size); |
| _table_level_row_count -= rows; |
| auto mutate_columns = block->mutate_columns(); |
| for (auto& col : mutate_columns) { |
| col->resize(rows); |
| } |
| block->set_columns(std::move(mutate_columns)); |
| *read_rows = rows; |
| if (_table_level_row_count == 0) { |
| *eof = true; |
| } |
| |
| return Status::OK(); |
| } |
| return get_next_block_inner(block, read_rows, eof); |
| } |
| |
| virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) = 0; |
| |
| Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
| std::unordered_set<std::string>* missing_cols) final { |
| return _file_format_reader->get_columns(name_to_type, missing_cols); |
| } |
| |
| Status get_parsed_schema(std::vector<std::string>* col_names, |
| std::vector<DataTypePtr>* col_types) override { |
| return _file_format_reader->get_parsed_schema(col_names, col_types); |
| } |
| |
| Status set_fill_columns( |
| const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& |
| partition_columns, |
| const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) final { |
| return _file_format_reader->set_fill_columns(partition_columns, missing_columns); |
| } |
| |
| bool fill_all_columns() const override { return _file_format_reader->fill_all_columns(); } |
| |
| virtual Status init_row_filters() = 0; |
| |
| bool count_read_rows() override { return _file_format_reader->count_read_rows(); } |
| |
| void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) override { |
| _file_format_reader->set_condition_cache_context(std::move(ctx)); |
| } |
| |
| bool has_delete_operations() const override { |
| return _file_format_reader->has_delete_operations(); |
| } |
| |
| int64_t get_total_rows() const override { return _file_format_reader->get_total_rows(); } |
| |
| protected: |
| std::string _table_format; // hudi, iceberg, paimon |
| std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc |
| RuntimeState* _state = nullptr; // for query options |
| RuntimeProfile* _profile = nullptr; |
| const TFileScanRangeParams& _params; |
| const TFileRangeDesc& _range; |
| io::IOContext* _io_ctx = nullptr; |
| int64_t _table_level_row_count = -1; // for optimization of count(*) push down |
| void _collect_profile_before_close() override { |
| if (_file_format_reader != nullptr) { |
| _file_format_reader->collect_profile_before_close(); |
| } |
| } |
| }; |
| |
| class TableSchemaChangeHelper { |
| public: |
| ~TableSchemaChangeHelper() = default; |
| |
| class Node { |
| public: |
| virtual ~Node() = default; |
| virtual std::shared_ptr<Node> get_children_node(std::string table_column_name) const { |
| throw std::logic_error("get_children_node should not be called on base TableInfoNode"); |
| }; |
| |
| virtual std::shared_ptr<Node> get_children_node_by_file_column_name( |
| std::string file_column_name) const { |
| throw std::logic_error( |
| "get_children_node_by_file_column_name should not be called on base " |
| "TableInfoNode"); |
| }; |
| |
| virtual std::string children_file_column_name(std::string table_column_name) const { |
| throw std::logic_error( |
| "children_file_column_name should not be called on base TableInfoNode"); |
| } |
| |
| virtual bool children_column_exists(std::string table_column_name) const { |
| throw std::logic_error( |
| "children_column_exists should not be called on base TableInfoNode"); |
| } |
| |
| virtual std::shared_ptr<Node> get_element_node() const { |
| throw std::logic_error("get_element_node should not be called on base TableInfoNode"); |
| } |
| |
| virtual std::shared_ptr<Node> get_key_node() const { |
| throw std::logic_error("get_key_node should not be called on base TableInfoNode"); |
| } |
| virtual std::shared_ptr<Node> get_value_node() const { |
| throw std::logic_error("get_value_node should not be called on base TableInfoNode"); |
| } |
| |
| virtual void add_not_exist_children(std::string table_column_name) { |
| throw std::logic_error( |
| "add_not_exist_children should not be called on base TableInfoNode"); |
| }; |
| |
| virtual void add_children(std::string table_column_name, std::string file_column_name, |
| std::shared_ptr<Node> children_node) { |
| throw std::logic_error("add_children should not be called on base TableInfoNode"); |
| } |
| }; |
| |
| class ScalarNode : public Node {}; |
| |
| class StructNode : public Node { |
| struct StructChild { |
| const std::shared_ptr<Node> node; |
| const std::string column_name; |
| const bool exists; |
| }; |
| |
| // table column name -> { node, file_column_name, exists_in_file} |
| std::map<std::string, StructChild> children; |
| |
| public: |
| std::shared_ptr<Node> get_children_node(std::string table_column_name) const override { |
| DCHECK(children.contains(table_column_name)); |
| DCHECK(children_column_exists(table_column_name)); |
| return children.at(table_column_name).node; |
| } |
| |
| std::shared_ptr<Node> get_children_node_by_file_column_name( |
| std::string file_column_name) const override { |
| // Search for the child by file column name |
| for (const auto& [table_name, child] : children) { |
| if (child.exists && child.column_name == file_column_name) { |
| return child.node; |
| } |
| } |
| // Not found - throw or return nullptr |
| throw std::runtime_error("File column name '" + file_column_name + |
| "' not found in struct children"); |
| } |
| |
| std::string children_file_column_name(std::string table_column_name) const override { |
| DCHECK(children.contains(table_column_name)); |
| DCHECK(children_column_exists(table_column_name)); |
| return children.at(table_column_name).column_name; |
| } |
| |
| bool children_column_exists(std::string table_column_name) const override { |
| DCHECK(children.contains(table_column_name)); |
| return children.at(table_column_name).exists; |
| } |
| |
| void add_not_exist_children(std::string table_column_name) override { |
| children.emplace(table_column_name, StructChild {nullptr, "", false}); |
| } |
| |
| void add_children(std::string table_column_name, std::string file_column_name, |
| std::shared_ptr<Node> children_node) override { |
| children.emplace(table_column_name, |
| StructChild {children_node, file_column_name, true}); |
| } |
| |
| const std::map<std::string, StructChild>& get_children() const { return children; } |
| }; |
| |
| class ArrayNode : public Node { |
| std::shared_ptr<Node> _element_node; |
| |
| public: |
| ArrayNode(const std::shared_ptr<Node>& element_node) : _element_node(element_node) {} |
| |
| std::shared_ptr<Node> get_element_node() const override { return _element_node; } |
| }; |
| |
| class MapNode : public Node { |
| std::shared_ptr<Node> _key_node; |
| std::shared_ptr<Node> _value_node; |
| |
| public: |
| MapNode(const std::shared_ptr<Node>& key_node, const std::shared_ptr<Node>& value_node) |
| : _key_node(key_node), _value_node(value_node) {} |
| |
| std::shared_ptr<Node> get_key_node() const override { return _key_node; } |
| |
| std::shared_ptr<Node> get_value_node() const override { return _value_node; } |
| }; |
| |
| class ConstNode : public Node { |
| // If you can be sure that there has been no schema change between the table and the file, |
| // you can use constNode (of course, you need to pay attention to case sensitivity). |
| public: |
| std::shared_ptr<Node> get_children_node(std::string table_column_name) const override { |
| return get_instance(); |
| }; |
| |
| std::shared_ptr<Node> get_children_node_by_file_column_name( |
| std::string file_column_name) const override { |
| return get_instance(); |
| }; |
| |
| std::string children_file_column_name(std::string table_column_name) const override { |
| return table_column_name; |
| } |
| |
| bool children_column_exists(std::string table_column_name) const override { return true; } |
| |
| std::shared_ptr<Node> get_element_node() const override { return get_instance(); } |
| |
| std::shared_ptr<Node> get_key_node() const override { return get_instance(); } |
| |
| std::shared_ptr<Node> get_value_node() const override { return get_instance(); } |
| |
| static const std::shared_ptr<ConstNode>& get_instance() { |
| static const std::shared_ptr<ConstNode> instance = std::make_shared<ConstNode>(); |
| return instance; |
| } |
| }; |
| |
| static std::string debug(const std::shared_ptr<Node>& root, size_t level = 0); |
| |
| protected: |
| // Whenever external components invoke the Parquet/ORC reader (e.g., init_reader, get_next_block, set_fill_columns), |
| // the parameters passed in are based on `table column names`. |
| // The table_info_node_ptr assists the Parquet/ORC reader in mapping these to the actual |
| // `file columns name` to be read and enables min/max filtering. |
| std::shared_ptr<Node> table_info_node_ptr = std::make_shared<StructNode>(); |
| |
| protected: |
| Status gen_table_info_node_by_field_id(const TFileScanRangeParams& params, |
| int64_t split_schema_id, |
| const TupleDescriptor* tuple_descriptor, |
| const FieldDescriptor& parquet_field_desc) { |
| if (!params.__isset.history_schema_info) [[unlikely]] { |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name( |
| tuple_descriptor, parquet_field_desc, table_info_node_ptr)); |
| return Status::OK(); |
| } |
| return gen_table_info_node_by_field_id(params, split_schema_id); |
| } |
| |
| Status gen_table_info_node_by_field_id(const TFileScanRangeParams& params, |
| int64_t split_schema_id, |
| const TupleDescriptor* tuple_descriptor, |
| const orc::Type* orc_type_ptr) { |
| if (!params.__isset.history_schema_info) [[unlikely]] { |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, orc_type_ptr, |
| table_info_node_ptr)); |
| return Status::OK(); |
| } |
| return gen_table_info_node_by_field_id(params, split_schema_id); |
| } |
| |
| private: |
| // The filed id of both the table and the file come from the pass from fe. (params.history_schema_info) |
| Status gen_table_info_node_by_field_id(const TFileScanRangeParams& params, |
| int64_t split_schema_id) { |
| if (params.current_schema_id == split_schema_id) { |
| table_info_node_ptr = ConstNode::get_instance(); |
| return Status::OK(); |
| } |
| |
| int32_t table_schema_idx = -1; |
| int32_t file_schema_idx = -1; |
| //todo : Perhaps this process can be optimized by pre-generating a map |
| for (int32_t idx = 0; idx < params.history_schema_info.size(); idx++) { |
| if (params.history_schema_info[idx].schema_id == params.current_schema_id) { |
| table_schema_idx = idx; |
| } else if (params.history_schema_info[idx].schema_id == split_schema_id) { |
| file_schema_idx = idx; |
| } |
| } |
| |
| if (table_schema_idx == -1 || file_schema_idx == -1) [[unlikely]] { |
| return Status::InternalError( |
| "miss table/file schema info, table_schema_idx:{} file_schema_idx:{}", |
| table_schema_idx, file_schema_idx); |
| } |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_table_field_id( |
| params.history_schema_info.at(table_schema_idx).root_field, |
| params.history_schema_info.at(file_schema_idx).root_field, table_info_node_ptr)); |
| return Status::OK(); |
| } |
| |
| public: |
| /* Schema change Util. Used to generate `std::shared_ptr<TableSchemaChangeHelper::Node> node`. |
| Passed node to parquet/orc reader to find file columns based on table columns, |
| */ |
| struct BuildTableInfoUtil { |
| static const Status SCHEMA_ERROR; |
| |
| // todo : Maybe I can use templates to implement this functionality. |
| |
| // for hive parquet : The table column names passed from fe are lowercase, so use lowercase file column names to match table column names. |
| static Status by_parquet_name(const TupleDescriptor* table_tuple_descriptor, |
| const FieldDescriptor& parquet_field_desc, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node, |
| const std::set<TSlotId>* is_file_slot = nullptr); |
| |
| // for hive parquet |
| static Status by_parquet_name(const DataTypePtr& table_data_type, |
| const FieldSchema& file_field, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node); |
| |
| // for hive orc: The table column names passed from fe are lowercase, so use lowercase file column names to match table column names. |
| static Status by_orc_name(const TupleDescriptor* table_tuple_descriptor, |
| const orc::Type* orc_type_ptr, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node, |
| const std::set<TSlotId>* is_file_slot = nullptr); |
| // for hive orc |
| static Status by_orc_name(const DataTypePtr& table_data_type, const orc::Type* orc_root, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node); |
| |
| // for paimon hudi: Use the field id in the `table schema` and `history table schema` to match columns. |
| static Status by_table_field_id(const schema::external::TField table_schema, |
| const schema::external::TField file_schema, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node); |
| |
| // for paimon hudi |
| static Status by_table_field_id(const schema::external::TStructField& table_schema, |
| const schema::external::TStructField& file_schema, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node); |
| |
| // for iceberg parquet |
| static Status by_parquet_field_id(const schema::external::TField& table_schema, |
| const FieldSchema& parquet_field, |
| const bool exist_field_id, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node); |
| |
| // for iceberg orc |
| static Status by_orc_field_id(const schema::external::TField& table_schema, |
| const orc::Type* orc_root, |
| const std::string& field_id_attribute_key, |
| const bool exist_field_id, |
| std::shared_ptr<TableSchemaChangeHelper::Node>& node); |
| }; |
| }; |
| |
| struct ColumnIdResult { |
| std::set<uint64_t> column_ids; |
| std::set<uint64_t> filter_column_ids; |
| |
| ColumnIdResult() = default; // Add default constructor |
| |
| ColumnIdResult(std::set<uint64_t> column_ids_, std::set<uint64_t> filter_column_ids_) |
| : column_ids(std::move(column_ids_)), |
| filter_column_ids(std::move(filter_column_ids_)) {} |
| }; |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |