| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "format/table/iceberg_reader.h" |
| |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <gen_cpp/parquet_types.h> |
| #include <glog/logging.h> |
| #include <parallel_hashmap/phmap.h> |
| #include <rapidjson/document.h> |
| |
| #include <algorithm> |
| #include <cstring> |
| #include <functional> |
| #include <memory> |
| #include <set> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/status.h" |
| #include "core/assert_cast.h" |
| #include "core/block/block.h" |
| #include "core/block/column_with_type_and_name.h" |
| #include "core/column/column.h" |
| #include "core/data_type/data_type_factory.hpp" |
| #include "exprs/aggregate/aggregate_function.h" |
| #include "format/format_common.h" |
| #include "format/generic_reader.h" |
| #include "format/orc/vorc_reader.h" |
| #include "format/parquet/schema_desc.h" |
| #include "format/parquet/vparquet_column_chunk_reader.h" |
| #include "format/table/deletion_vector_reader.h" |
| #include "format/table/iceberg/iceberg_orc_nested_column_utils.h" |
| #include "format/table/iceberg/iceberg_parquet_nested_column_utils.h" |
| #include "format/table/iceberg_delete_file_reader_helper.h" |
| #include "format/table/nested_column_access_helper.h" |
| #include "format/table/table_format_reader.h" |
| #include "runtime/runtime_state.h" |
| #include "util/coding.h" |
| |
| namespace cctz { |
| #include "common/compile_check_begin.h" |
| class time_zone; |
| } // namespace cctz |
| namespace doris { |
| class RowDescriptor; |
| class SlotDescriptor; |
| class TupleDescriptor; |
| |
| namespace io { |
| struct IOContext; |
| } // namespace io |
| class VExprContext; |
| } // namespace doris |
| |
| namespace doris { |
| namespace { |
| |
| class GroupedDeleteRowsVisitor final : public IcebergPositionDeleteVisitor { |
| public: |
| using DeleteRows = std::vector<int64_t>; |
| using DeleteFile = phmap::parallel_flat_hash_map< |
| std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
| std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
| std::mutex>; |
| |
| explicit GroupedDeleteRowsVisitor(DeleteFile* position_delete) |
| : _position_delete(position_delete) {} |
| |
| Status visit(const std::string& file_path, int64_t pos) override { |
| if (_position_delete == nullptr) { |
| return Status::InvalidArgument("position delete map is null"); |
| } |
| |
| auto iter = _position_delete->find(file_path); |
| DeleteRows* delete_rows = nullptr; |
| if (iter == _position_delete->end()) { |
| delete_rows = new DeleteRows; |
| (*_position_delete)[file_path] = std::unique_ptr<DeleteRows>(delete_rows); |
| } else { |
| delete_rows = iter->second.get(); |
| } |
| delete_rows->push_back(pos); |
| return Status::OK(); |
| } |
| |
| private: |
| DeleteFile* _position_delete; |
| }; |
| |
| } // namespace |
| |
| const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id"; |
| |
| IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, |
| RuntimeProfile* profile, RuntimeState* state, |
| const TFileScanRangeParams& params, |
| const TFileRangeDesc& range, ShardedKVCache* kv_cache, |
| io::IOContext* io_ctx, FileMetaCache* meta_cache) |
| : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx, |
| meta_cache), |
| _kv_cache(kv_cache) { |
| static const char* iceberg_profile = "IcebergProfile"; |
| ADD_TIMER(_profile, iceberg_profile); |
| _iceberg_profile.num_delete_files = |
| ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile); |
| _iceberg_profile.num_delete_rows = |
| ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile); |
| _iceberg_profile.delete_files_read_time = |
| ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); |
| _iceberg_profile.delete_rows_sort_time = |
| ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); |
| _iceberg_profile.parse_delete_file_time = |
| ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile); |
| } |
| |
| Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { |
| RETURN_IF_ERROR(_expand_block_if_need(block)); |
| |
| RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); |
| |
| if (_equality_delete_impls.size() > 0) { |
| std::unique_ptr<IColumn::Filter> filter = |
| std::make_unique<IColumn::Filter>(block->rows(), 1); |
| for (auto& equality_delete_impl : _equality_delete_impls) { |
| RETURN_IF_ERROR(equality_delete_impl->filter_data_block( |
| block, _col_name_to_block_idx, _id_to_block_column_name, *filter)); |
| } |
| Block::filter_block_internal(block, *filter, block->columns()); |
| } |
| |
| *read_rows = block->rows(); |
| return _shrink_block_if_need(block); |
| } |
| |
| Status IcebergTableReader::init_row_filters() { |
| // We get the count value by doris's be, so we don't need to read the delete file |
| if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { |
| return Status::OK(); |
| } |
| |
| const auto& table_desc = _range.table_format_params.iceberg_params; |
| const auto& version = table_desc.format_version; |
| if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { |
| return Status::OK(); |
| } |
| |
| auto* parquet_reader = dynamic_cast<ParquetReader*>(_file_format_reader.get()); |
| auto* orc_reader = dynamic_cast<OrcReader*>(_file_format_reader.get()); |
| |
| // Initialize file information for $row_id generation |
| // Extract from table_desc which contains current file's metadata |
| if (_need_row_id_column) { |
| std::string file_path = table_desc.original_file_path; |
| int32_t partition_spec_id = 0; |
| std::string partition_data_json; |
| if (table_desc.__isset.partition_spec_id) { |
| partition_spec_id = table_desc.partition_spec_id; |
| } |
| if (table_desc.__isset.partition_data_json) { |
| partition_data_json = table_desc.partition_data_json; |
| } |
| |
| if (parquet_reader != nullptr) { |
| parquet_reader->set_iceberg_rowid_params(file_path, partition_spec_id, |
| partition_data_json, _row_id_column_position); |
| } else if (orc_reader != nullptr) { |
| orc_reader->set_iceberg_rowid_params(file_path, partition_spec_id, partition_data_json, |
| _row_id_column_position); |
| } |
| LOG(INFO) << "Initialized $row_id generation for file: " << file_path |
| << ", partition_spec_id: " << partition_spec_id; |
| } |
| |
| std::vector<TIcebergDeleteFileDesc> position_delete_files; |
| std::vector<TIcebergDeleteFileDesc> equality_delete_files; |
| std::vector<TIcebergDeleteFileDesc> deletion_vector_files; |
| for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { |
| if (desc.content == POSITION_DELETE) { |
| position_delete_files.emplace_back(desc); |
| } else if (desc.content == EQUALITY_DELETE) { |
| equality_delete_files.emplace_back(desc); |
| } else if (desc.content == DELETION_VECTOR) { |
| deletion_vector_files.emplace_back(desc); |
| } |
| } |
| |
| if (!equality_delete_files.empty()) { |
| RETURN_IF_ERROR(_process_equality_delete(equality_delete_files)); |
| _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); |
| } |
| |
| if (!deletion_vector_files.empty()) { |
| if (deletion_vector_files.size() != 1) [[unlikely]] { |
| /* |
| * Deletion vectors are a binary representation of deletes for a single data file that is more efficient |
| * at execution time than position delete files. Unlike equality or position delete files, there can be |
| * at most one deletion vector for a given data file in a snapshot. |
| */ |
| return Status::DataQualityError("This iceberg data file has multiple DVs."); |
| } |
| RETURN_IF_ERROR( |
| read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); |
| |
| _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); |
| // Readers can safely ignore position delete files if there is a DV for a data file. |
| } else if (!position_delete_files.empty()) { |
| RETURN_IF_ERROR( |
| _position_delete_base(table_desc.original_file_path, position_delete_files)); |
| _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); |
| } |
| |
| COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); |
| return Status::OK(); |
| } |
| |
| void IcebergTableReader::_generate_equality_delete_block( |
| Block* block, const std::vector<std::string>& equality_delete_col_names, |
| const std::vector<DataTypePtr>& equality_delete_col_types) { |
| for (int i = 0; i < equality_delete_col_names.size(); ++i) { |
| DataTypePtr data_type = make_nullable(equality_delete_col_types[i]); |
| MutableColumnPtr data_column = data_type->create_column(); |
| block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, |
| equality_delete_col_names[i])); |
| } |
| } |
| |
| Status IcebergTableReader::_expand_block_if_need(Block* block) { |
| std::set<std::string> names; |
| auto block_names = block->get_names(); |
| names.insert(block_names.begin(), block_names.end()); |
| for (auto& col : _expand_columns) { |
| col.column->assume_mutable()->clear(); |
| if (names.contains(col.name)) { |
| return Status::InternalError("Wrong expand column '{}'", col.name); |
| } |
| names.insert(col.name); |
| (*_col_name_to_block_idx)[col.name] = static_cast<uint32_t>(block->columns()); |
| block->insert(col); |
| } |
| return Status::OK(); |
| } |
| |
| Status IcebergTableReader::_shrink_block_if_need(Block* block) { |
| std::set<size_t> positions_to_erase; |
| for (const std::string& expand_col : _expand_col_names) { |
| if (!_col_name_to_block_idx->contains(expand_col)) { |
| return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, |
| block->dump_names()); |
| } |
| positions_to_erase.emplace((*_col_name_to_block_idx)[expand_col]); |
| } |
| block->erase(positions_to_erase); |
| for (const std::string& expand_col : _expand_col_names) { |
| _col_name_to_block_idx->erase(expand_col); |
| } |
| return Status::OK(); |
| } |
| |
| Status IcebergTableReader::_position_delete_base( |
| const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
| std::vector<DeleteRows*> delete_rows_array; |
| int64_t num_delete_rows = 0; |
| for (const auto& delete_file : delete_files) { |
| SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
| Status create_status = Status::OK(); |
| auto* delete_file_cache = _kv_cache->get<DeleteFile>( |
| _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* { |
| auto* position_delete = new DeleteFile; |
| create_status = _read_position_delete_file(delete_file, position_delete); |
| |
| if (!create_status) { |
| return nullptr; |
| } |
| |
| return position_delete; |
| }); |
| if (create_status.is<ErrorCode::END_OF_FILE>()) { |
| continue; |
| } else if (!create_status.ok()) { |
| return create_status; |
| } |
| |
| DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache); |
| auto get_value = [&](const auto& v) { |
| DeleteRows* row_ids = v.second.get(); |
| if (!row_ids->empty()) { |
| delete_rows_array.emplace_back(row_ids); |
| num_delete_rows += row_ids->size(); |
| } |
| }; |
| delete_file_map.if_contains(data_file_path, get_value); |
| } |
| // Use a KV cache to store the delete rows corresponding to a data file path. |
| // The Parquet/ORC reader holds a reference (pointer) to this cached entry. |
| // This allows delete rows to be reused when a single data file is split into |
| // multiple splits, avoiding excessive memory usage when delete rows are large. |
| if (num_delete_rows > 0) { |
| SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); |
| _iceberg_delete_rows = |
| _kv_cache->get<DeleteRows>(data_file_path, |
| [&]() -> DeleteRows* { |
| auto* data_file_position_delete = new DeleteRows; |
| _sort_delete_rows(delete_rows_array, num_delete_rows, |
| *data_file_position_delete); |
| |
| return data_file_position_delete; |
| } |
| |
| ); |
| set_delete_rows(); |
| COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); |
| } |
| return Status::OK(); |
| } |
| |
| Status IcebergTableReader::_read_position_delete_file(const TIcebergDeleteFileDesc& delete_file, |
| DeleteFile* position_delete) { |
| GroupedDeleteRowsVisitor visitor(position_delete); |
| IcebergDeleteFileReaderOptions options; |
| options.state = _state; |
| options.profile = _profile; |
| options.scan_params = &_params; |
| options.io_ctx = _io_ctx; |
| options.meta_cache = _meta_cache; |
| options.fs_name = &_range.fs_name; |
| options.batch_size = READ_DELETE_FILE_BATCH_SIZE; |
| return read_iceberg_position_delete_file(delete_file, options, &visitor); |
| } |
| |
| /** |
| * https://iceberg.apache.org/spec/#position-delete-files |
| * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning. |
| * Sorting by file_path allows filter pushdown by file in columnar storage formats. |
| * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory. |
| */ |
| void IcebergTableReader::_sort_delete_rows( |
| const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows, |
| std::vector<int64_t>& result) { |
| if (delete_rows_array.empty()) { |
| return; |
| } |
| if (delete_rows_array.size() == 1) { |
| result.resize(num_delete_rows); |
| memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows); |
| return; |
| } |
| if (delete_rows_array.size() == 2) { |
| result.resize(num_delete_rows); |
| std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(), |
| delete_rows_array.back()->begin(), delete_rows_array.back()->end(), |
| result.begin()); |
| return; |
| } |
| |
| using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>; |
| result.resize(num_delete_rows); |
| auto row_id_iter = result.begin(); |
| auto iter_end = result.end(); |
| std::vector<vec_pair> rows_array; |
| for (auto* rows : delete_rows_array) { |
| if (!rows->empty()) { |
| rows_array.emplace_back(rows->begin(), rows->end()); |
| } |
| } |
| size_t array_size = rows_array.size(); |
| while (row_id_iter != iter_end) { |
| int64_t min_index = 0; |
| int64_t min = *rows_array[0].first; |
| for (size_t i = 0; i < array_size; ++i) { |
| if (*rows_array[i].first < min) { |
| min_index = i; |
| min = *rows_array[i].first; |
| } |
| } |
| *row_id_iter++ = min; |
| rows_array[min_index].first++; |
| if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) { |
| rows_array.erase(rows_array.begin() + min_index); |
| array_size--; |
| } |
| } |
| } |
| |
| Status IcebergParquetReader::init_reader( |
| const std::vector<std::string>& file_col_names, |
| std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
| const VExprContextSPtrs& conjuncts, |
| phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
| slot_id_to_predicates, |
| const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, |
| const std::unordered_map<std::string, int>* colname_to_slot_id, |
| const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
| const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { |
| _file_format = Fileformat::PARQUET; |
| _col_name_to_block_idx = col_name_to_block_idx; |
| auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get()); |
| RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc)); |
| DCHECK(_data_file_field_desc != nullptr); |
| if (_row_lineage_columns != nullptr) { |
| const auto& table_desc = _range.table_format_params.iceberg_params; |
| _row_lineage_columns->first_row_id = |
| table_desc.__isset.first_row_id ? table_desc.first_row_id : -1; |
| _row_lineage_columns->last_updated_sequence_number = |
| table_desc.__isset.last_updated_sequence_number |
| ? table_desc.last_updated_sequence_number |
| : -1; |
| parquet_reader->set_row_lineage_columns(_row_lineage_columns); |
| } |
| |
| auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor); |
| auto& column_ids = column_id_result.column_ids; |
| const auto& filter_column_ids = column_id_result.filter_column_ids; |
| |
| RETURN_IF_ERROR(init_row_filters()); |
| _all_required_col_names = file_col_names; |
| |
| if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] { |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name( |
| tuple_descriptor, *_data_file_field_desc, table_info_node_ptr)); |
| } else { |
| std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end()); |
| |
| bool exist_field_id = true; |
| for (int idx = 0; idx < _data_file_field_desc->size(); idx++) { |
| if (_data_file_field_desc->get_column(idx)->field_id == -1) { |
| // the data file may be from hive table migrated to iceberg, field id is missing |
| exist_field_id = false; |
| break; |
| } |
| } |
| const auto& table_schema = _params.history_schema_info.front().root_field; |
| |
| table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
| if (exist_field_id) { |
| // id -> table column name. columns that need read data file. |
| std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field; |
| for (const auto& table_field : table_schema.fields) { |
| auto field = table_field.field_ptr; |
| DCHECK(field->__isset.name); |
| if (!read_col_name_set.contains(field->name)) { |
| continue; |
| } |
| id_to_table_field.emplace(field->id, field); |
| } |
| |
| for (int idx = 0; idx < _data_file_field_desc->size(); idx++) { |
| const auto& data_file_field = _data_file_field_desc->get_column(idx); |
| auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id; |
| |
| if (id_to_table_field.contains(data_file_column_id)) { |
| const auto& table_field = id_to_table_field[data_file_column_id]; |
| |
| std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( |
| *table_field, *data_file_field, exist_field_id, field_node)); |
| table_info_node_ptr->add_children(table_field->name, data_file_field->name, |
| field_node); |
| |
| _id_to_block_column_name.emplace(data_file_column_id, table_field->name); |
| id_to_table_field.erase(data_file_column_id); |
| } else if (_equality_delete_col_ids.contains(data_file_column_id)) { |
| // Columns that need to be read for equality delete. |
| const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; |
| |
| // Construct table column names that avoid duplication with current table schema. |
| // As the columns currently being read may have been deleted in the latest |
| // table structure or have undergone a series of schema changes... |
| std::string table_column_name = EQ_DELETE_PRE + data_file_field->name; |
| table_info_node_ptr->add_children( |
| table_column_name, data_file_field->name, |
| std::make_shared<TableSchemaChangeHelper::ConstNode>()); |
| |
| _id_to_block_column_name.emplace(data_file_column_id, table_column_name); |
| _expand_col_names.emplace_back(table_column_name); |
| auto expand_data_type = make_nullable(data_file_field->data_type); |
| _expand_columns.emplace_back( |
| ColumnWithTypeAndName {expand_data_type->create_column(), |
| expand_data_type, table_column_name}); |
| |
| _all_required_col_names.emplace_back(table_column_name); |
| column_ids.insert(data_file_field->get_column_id()); |
| } |
| } |
| for (const auto& [id, table_field] : id_to_table_field) { |
| table_info_node_ptr->add_not_exist_children(table_field->name); |
| } |
| } else { |
| if (!_equality_delete_col_ids.empty()) [[unlikely]] { |
| return Status::InternalError( |
| "Can not read missing field id data file when have equality delete"); |
| } |
| std::map<std::string, size_t> file_column_idx_map; |
| for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) { |
| file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx); |
| } |
| |
| for (const auto& table_field : table_schema.fields) { |
| DCHECK(table_field.__isset.field_ptr); |
| DCHECK(table_field.field_ptr->__isset.name); |
| const auto& table_column_name = table_field.field_ptr->name; |
| if (!read_col_name_set.contains(table_column_name)) { |
| continue; |
| } |
| if (!table_field.field_ptr->__isset.name_mapping || |
| table_field.field_ptr->name_mapping.size() == 0) { |
| return Status::DataQualityError( |
| "name_mapping must be set when read missing field id data file."); |
| } |
| bool have_mapping = false; |
| for (const auto& mapped_name : table_field.field_ptr->name_mapping) { |
| if (file_column_idx_map.contains(mapped_name)) { |
| std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
| const auto& file_field = _data_file_field_desc->get_column( |
| file_column_idx_map.at(mapped_name)); |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( |
| *table_field.field_ptr, *file_field, exist_field_id, field_node)); |
| table_info_node_ptr->add_children(table_column_name, file_field->name, |
| field_node); |
| have_mapping = true; |
| break; |
| } |
| } |
| if (!have_mapping) { |
| table_info_node_ptr->add_not_exist_children(table_column_name); |
| } |
| } |
| } |
| } |
| |
| return parquet_reader->init_reader( |
| _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates, |
| tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, |
| slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids); |
| } |
| |
| ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc, |
| const TupleDescriptor* tuple_descriptor) { |
| // First, assign column IDs to the field descriptor |
| auto* mutable_field_desc = const_cast<FieldDescriptor*>(field_desc); |
| mutable_field_desc->assign_ids(); |
| |
| // map top-level table column iceberg_id -> FieldSchema* |
| std::unordered_map<int, const FieldSchema*> iceberg_id_to_field_schema_map; |
| |
| for (int i = 0; i < field_desc->size(); ++i) { |
| auto field_schema = field_desc->get_column(i); |
| if (!field_schema) continue; |
| |
| int iceberg_id = field_schema->field_id; |
| iceberg_id_to_field_schema_map[iceberg_id] = field_schema; |
| } |
| |
| std::set<uint64_t> column_ids; |
| std::set<uint64_t> filter_column_ids; |
| |
| // helper to process access paths for a given top-level parquet field |
| auto process_access_paths = [](const FieldSchema* parquet_field, |
| const std::vector<TColumnAccessPath>& access_paths, |
| std::set<uint64_t>& out_ids) { |
| process_nested_access_paths( |
| parquet_field, access_paths, out_ids, |
| [](const FieldSchema* field) { return field->get_column_id(); }, |
| [](const FieldSchema* field) { return field->get_max_column_id(); }, |
| IcebergParquetNestedColumnUtils::extract_nested_column_ids); |
| }; |
| |
| for (const auto* slot : tuple_descriptor->slots()) { |
| auto it = iceberg_id_to_field_schema_map.find(slot->col_unique_id()); |
| if (it == iceberg_id_to_field_schema_map.end()) { |
| // Column not found in file (e.g., partition column, added column) |
| continue; |
| } |
| auto field_schema = it->second; |
| |
| // primitive (non-nested) types: direct mapping by name |
| if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && |
| slot->col_type() != TYPE_MAP)) { |
| column_ids.insert(field_schema->column_id); |
| |
| if (slot->is_predicate()) { |
| filter_column_ids.insert(field_schema->column_id); |
| } |
| continue; |
| } |
| |
| // complex types: |
| const auto& all_access_paths = slot->all_access_paths(); |
| process_access_paths(field_schema, all_access_paths, column_ids); |
| |
| const auto& predicate_access_paths = slot->predicate_access_paths(); |
| if (!predicate_access_paths.empty()) { |
| process_access_paths(field_schema, predicate_access_paths, filter_column_ids); |
| } |
| } |
| return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); |
| } |
| |
| Status IcebergOrcReader::init_reader( |
| const std::vector<std::string>& file_col_names, |
| std::unordered_map<std::string, uint32_t>* col_name_to_block_idx, |
| const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, |
| const RowDescriptor* row_descriptor, |
| const std::unordered_map<std::string, int>* colname_to_slot_id, |
| const VExprContextSPtrs* not_single_slot_filter_conjuncts, |
| const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { |
| _file_format = Fileformat::ORC; |
| _col_name_to_block_idx = col_name_to_block_idx; |
| auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get()); |
| RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc)); |
| std::vector<std::string> data_file_col_names; |
| std::vector<DataTypePtr> data_file_col_types; |
| RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types)); |
| if (_row_lineage_columns != nullptr) { |
| const auto& table_desc = _range.table_format_params.iceberg_params; |
| _row_lineage_columns->first_row_id = |
| table_desc.__isset.first_row_id ? table_desc.first_row_id : -1; |
| _row_lineage_columns->last_updated_sequence_number = |
| table_desc.__isset.last_updated_sequence_number |
| ? table_desc.last_updated_sequence_number |
| : -1; |
| orc_reader->set_row_lineage_columns(_row_lineage_columns); |
| } |
| |
| auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor); |
| auto& column_ids = column_id_result.column_ids; |
| const auto& filter_column_ids = column_id_result.filter_column_ids; |
| |
| RETURN_IF_ERROR(init_row_filters()); |
| |
| _all_required_col_names = file_col_names; |
| if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] { |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc, |
| table_info_node_ptr)); |
| } else { |
| std::set<std::string> read_col_name_set(file_col_names.begin(), file_col_names.end()); |
| |
| bool exist_field_id = true; |
| for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { |
| if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
| exist_field_id = false; |
| break; |
| } |
| } |
| |
| const auto& table_schema = _params.history_schema_info.front().root_field; |
| table_info_node_ptr = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
| if (exist_field_id) { |
| // id -> table column name. columns that need read data file. |
| std::unordered_map<int, std::shared_ptr<schema::external::TField>> id_to_table_field; |
| for (const auto& table_field : table_schema.fields) { |
| auto field = table_field.field_ptr; |
| DCHECK(field->__isset.name); |
| if (!read_col_name_set.contains(field->name)) { |
| continue; |
| } |
| |
| id_to_table_field.emplace(field->id, field); |
| } |
| |
| for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { |
| const auto& data_file_field = _data_file_type_desc->getSubtype(idx); |
| auto data_file_column_id = |
| std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
| auto const& file_column_name = _data_file_type_desc->getFieldName(idx); |
| |
| if (id_to_table_field.contains(data_file_column_id)) { |
| const auto& table_field = id_to_table_field[data_file_column_id]; |
| |
| std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( |
| *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id, |
| field_node)); |
| table_info_node_ptr->add_children(table_field->name, file_column_name, |
| field_node); |
| |
| _id_to_block_column_name.emplace(data_file_column_id, table_field->name); |
| id_to_table_field.erase(data_file_column_id); |
| } else if (_equality_delete_col_ids.contains(data_file_column_id)) { |
| // Columns that need to be read for equality delete. |
| const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; |
| |
| // Construct table column names that avoid duplication with current table schema. |
| // As the columns currently being read may have been deleted in the latest |
| // table structure or have undergone a series of schema changes... |
| std::string table_column_name = EQ_DELETE_PRE + file_column_name; |
| table_info_node_ptr->add_children( |
| table_column_name, file_column_name, |
| std::make_shared<TableSchemaChangeHelper::ConstNode>()); |
| |
| _id_to_block_column_name.emplace(data_file_column_id, table_column_name); |
| _expand_col_names.emplace_back(table_column_name); |
| |
| auto expand_data_type = make_nullable(data_file_col_types[idx]); |
| _expand_columns.emplace_back( |
| ColumnWithTypeAndName {expand_data_type->create_column(), |
| expand_data_type, table_column_name}); |
| |
| _all_required_col_names.emplace_back(table_column_name); |
| column_ids.insert(data_file_field->getColumnId()); |
| } |
| } |
| for (const auto& [id, table_field] : id_to_table_field) { |
| table_info_node_ptr->add_not_exist_children(table_field->name); |
| } |
| } else { |
| if (!_equality_delete_col_ids.empty()) [[unlikely]] { |
| return Status::InternalError( |
| "Can not read missing field id data file when have equality delete"); |
| } |
| std::map<std::string, size_t> file_column_idx_map; |
| for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { |
| auto const& file_column_name = _data_file_type_desc->getFieldName(idx); |
| file_column_idx_map.emplace(file_column_name, idx); |
| } |
| |
| for (const auto& table_field : table_schema.fields) { |
| DCHECK(table_field.__isset.field_ptr); |
| DCHECK(table_field.field_ptr->__isset.name); |
| const auto& table_column_name = table_field.field_ptr->name; |
| if (!read_col_name_set.contains(table_column_name)) { |
| continue; |
| } |
| if (!table_field.field_ptr->__isset.name_mapping || |
| table_field.field_ptr->name_mapping.size() == 0) { |
| return Status::DataQualityError( |
| "name_mapping must be set when read missing field id data file."); |
| } |
| auto have_mapping = false; |
| for (const auto& mapped_name : table_field.field_ptr->name_mapping) { |
| if (file_column_idx_map.contains(mapped_name)) { |
| auto file_column_idx = file_column_idx_map.at(mapped_name); |
| std::shared_ptr<TableSchemaChangeHelper::Node> field_node = nullptr; |
| const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx); |
| RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( |
| *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE, |
| exist_field_id, field_node)); |
| table_info_node_ptr->add_children( |
| table_column_name, |
| _data_file_type_desc->getFieldName(file_column_idx), field_node); |
| have_mapping = true; |
| break; |
| } |
| } |
| if (!have_mapping) { |
| table_info_node_ptr->add_not_exist_children(table_column_name); |
| } |
| } |
| } |
| } |
| |
| return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts, |
| false, tuple_descriptor, row_descriptor, |
| not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, |
| table_info_node_ptr, column_ids, filter_column_ids); |
| } |
| |
| ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, |
| const TupleDescriptor* tuple_descriptor) { |
| // map top-level table column iceberg_id -> orc::Type* |
| std::unordered_map<int, const orc::Type*> iceberg_id_to_orc_type_map; |
| for (uint64_t i = 0; i < orc_type->getSubtypeCount(); ++i) { |
| auto orc_sub_type = orc_type->getSubtype(i); |
| if (!orc_sub_type) continue; |
| |
| if (!orc_sub_type->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
| continue; |
| } |
| int iceberg_id = std::stoi(orc_sub_type->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
| iceberg_id_to_orc_type_map[iceberg_id] = orc_sub_type; |
| } |
| |
| std::set<uint64_t> column_ids; |
| std::set<uint64_t> filter_column_ids; |
| |
| // helper to process access paths for a given top-level orc field |
| auto process_access_paths = [](const orc::Type* orc_field, |
| const std::vector<TColumnAccessPath>& access_paths, |
| std::set<uint64_t>& out_ids) { |
| process_nested_access_paths( |
| orc_field, access_paths, out_ids, |
| [](const orc::Type* type) { return type->getColumnId(); }, |
| [](const orc::Type* type) { return type->getMaximumColumnId(); }, |
| IcebergOrcNestedColumnUtils::extract_nested_column_ids); |
| }; |
| |
| for (const auto* slot : tuple_descriptor->slots()) { |
| auto it = iceberg_id_to_orc_type_map.find(slot->col_unique_id()); |
| if (it == iceberg_id_to_orc_type_map.end()) { |
| // Column not found in file |
| continue; |
| } |
| const orc::Type* orc_field = it->second; |
| |
| // primitive (non-nested) types |
| if ((slot->col_type() != TYPE_STRUCT && slot->col_type() != TYPE_ARRAY && |
| slot->col_type() != TYPE_MAP)) { |
| column_ids.insert(orc_field->getColumnId()); |
| if (slot->is_predicate()) { |
| filter_column_ids.insert(orc_field->getColumnId()); |
| } |
| continue; |
| } |
| |
| // complex types |
| const auto& all_access_paths = slot->all_access_paths(); |
| process_access_paths(orc_field, all_access_paths, column_ids); |
| |
| const auto& predicate_access_paths = slot->predicate_access_paths(); |
| if (!predicate_access_paths.empty()) { |
| process_access_paths(orc_field, predicate_access_paths, filter_column_ids); |
| } |
| } |
| |
| return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); |
| } |
| |
| // Directly read the deletion vector using the `content_offset` and |
| // `content_size_in_bytes` provided by FE in `delete_file_desc`. |
| // These two fields indicate the location of a blob in storage. |
| // Since the current format is `deletion-vector-v1`, which does not |
| // compress any blobs, we can temporarily skip parsing the Puffin footer. |
| Status IcebergTableReader::read_deletion_vector(const std::string& data_file_path, |
| const TIcebergDeleteFileDesc& delete_file_desc) { |
| Status create_status = Status::OK(); |
| SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
| _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
| auto* delete_rows = new DeleteRows; |
| |
| TFileRangeDesc delete_range; |
| // must use __set() method to make sure __isset is true |
| delete_range.__set_fs_name(_range.fs_name); |
| delete_range.path = delete_file_desc.path; |
| delete_range.start_offset = delete_file_desc.content_offset; |
| delete_range.size = delete_file_desc.content_size_in_bytes; |
| delete_range.file_size = -1; |
| |
| // We may consider caching the DeletionVectorReader when reading Puffin files, |
| // where the underlying reader is an `InMemoryFileReader` and a single data file is |
| // split into multiple splits. However, we need to ensure that the underlying |
| // reader supports multi-threaded access. |
| DeletionVectorReader dv_reader(_state, _profile, _params, delete_range, _io_ctx); |
| create_status = dv_reader.open(); |
| if (!create_status.ok()) [[unlikely]] { |
| return nullptr; |
| } |
| |
| size_t buffer_size = delete_range.size; |
| std::vector<char> buf(buffer_size); |
| if (buffer_size < 12) [[unlikely]] { |
| // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32 |
| create_status = Status::DataQualityError("Deletion vector file size too small: {}", |
| buffer_size); |
| return nullptr; |
| } |
| |
| create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size}); |
| if (!create_status) [[unlikely]] { |
| return nullptr; |
| } |
| // The serialized blob contains: |
| // |
| // Combined length of the vector and magic bytes stored as 4 bytes, big-endian |
| // A 4-byte magic sequence, D1 D3 39 64 |
| // The vector, serialized as described below |
| // A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian |
| |
| auto total_length = BigEndian::Load32(buf.data()); |
| if (total_length + 8 != buffer_size) [[unlikely]] { |
| create_status = Status::DataQualityError( |
| "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8, |
| buffer_size); |
| return nullptr; |
| } |
| |
| constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
| if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] { |
| create_status = Status::DataQualityError("Deletion vector magic number mismatch"); |
| return nullptr; |
| } |
| |
| roaring::Roaring64Map bitmap; |
| SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
| try { |
| bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12); |
| } catch (const std::runtime_error& e) { |
| create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
| return nullptr; |
| } |
| // skip CRC-32 checksum |
| |
| delete_rows->reserve(bitmap.cardinality()); |
| for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
| delete_rows->push_back(*it); |
| } |
| COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size()); |
| return delete_rows; |
| }); |
| |
| RETURN_IF_ERROR(create_status); |
| if (!_iceberg_delete_rows->empty()) [[likely]] { |
| set_delete_rows(); |
| } |
| return Status::OK(); |
| } |
| |
| // Similar to the code structure of IcebergOrcReader::_process_equality_delete, |
| // but considering the significant differences in how parquet/orc obtains |
| // attributes/column IDs, it is not easy to combine them. |
| Status IcebergParquetReader::_process_equality_delete( |
| const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
| std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
| partition_columns; |
| std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
| |
| std::map<int, const FieldSchema*> data_file_id_to_field_schema; |
| for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) { |
| auto field_schema = _data_file_field_desc->get_column(idx); |
| if (_data_file_field_desc->get_column(idx)->field_id == -1) { |
| return Status::DataQualityError("Iceberg equality delete data file missing field id."); |
| } |
| data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] = |
| field_schema; |
| } |
| |
| for (const auto& delete_file : delete_files) { |
| TFileRangeDesc delete_desc; |
| // must use __set() method to make sure __isset is true |
| delete_desc.__set_fs_name(_range.fs_name); |
| delete_desc.path = delete_file.path; |
| delete_desc.start_offset = 0; |
| delete_desc.size = -1; |
| delete_desc.file_size = -1; |
| |
| if (!delete_file.__isset.field_ids) [[unlikely]] { |
| return Status::InternalError( |
| "missing delete field ids when reading equality delete file"); |
| } |
| auto& read_column_field_ids = delete_file.field_ids; |
| std::set<int> read_column_field_ids_set; |
| for (const auto& field_id : read_column_field_ids) { |
| read_column_field_ids_set.insert(field_id); |
| _equality_delete_col_ids.insert(field_id); |
| } |
| |
| auto delete_reader = ParquetReader::create_unique( |
| _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE, |
| &_state->timezone_obj(), _io_ctx, _state, _meta_cache); |
| RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
| |
| // the column that to read equality delete file. |
| // (delete file may be have extra columns that don't need to read) |
| std::vector<std::string> delete_col_names; |
| std::vector<DataTypePtr> delete_col_types; |
| std::vector<int> delete_col_ids; |
| std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
| |
| const FieldDescriptor* delete_field_desc = nullptr; |
| RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc)); |
| DCHECK(delete_field_desc != nullptr); |
| |
| auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
| for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) { |
| if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id |
| // equality delete file must have delete_file_field id to match column. |
| return Status::DataQualityError( |
| "missing delete_file_field id when reading equality delete file"); |
| } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) { |
| // the column that need to read. |
| if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column |
| return Status::InternalError( |
| "can not support read complex column in equality delete file"); |
| } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id)) |
| [[unlikely]] { |
| return Status::DataQualityError( |
| "can not find delete field id in data file schema when reading " |
| "equality delete file"); |
| } |
| auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id]; |
| if (data_file_field->data_type->get_primitive_type() != |
| delete_file_field.data_type->get_primitive_type()) [[unlikely]] { |
| return Status::NotSupported( |
| "Not Support type change in equality delete, field: {}, delete " |
| "file type: {}, data file type: {}", |
| delete_file_field.field_id, delete_file_field.data_type->get_name(), |
| data_file_field->data_type->get_name()); |
| } |
| |
| std::string filed_lower_name = to_lower(delete_file_field.name); |
| eq_file_node->add_children(filed_lower_name, delete_file_field.name, |
| std::make_shared<TableSchemaChangeHelper::ScalarNode>()); |
| |
| delete_col_ids.emplace_back(delete_file_field.field_id); |
| delete_col_names.emplace_back(filed_lower_name); |
| delete_col_types.emplace_back(make_nullable(delete_file_field.data_type)); |
| |
| read_column_field_ids_set.erase(delete_file_field.field_id); |
| } else { |
| // delete file may be have extra columns that don't need to read |
| } |
| } |
| if (!read_column_field_ids_set.empty()) [[unlikely]] { |
| return Status::DataQualityError("some field ids not found in equality delete file."); |
| } |
| |
| for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
| delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
| } |
| phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp; |
| RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx, |
| {}, tmp, nullptr, nullptr, nullptr, nullptr, |
| nullptr, eq_file_node, false)); |
| RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); |
| |
| if (!_equality_delete_block_map.contains(delete_col_ids)) { |
| _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
| Block block; |
| _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
| _equality_delete_blocks.emplace_back(block); |
| } |
| Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
| bool eof = false; |
| while (!eof) { |
| Block tmp_block; |
| _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
| size_t read_rows = 0; |
| RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof)); |
| if (read_rows > 0) { |
| MutableBlock mutable_block(&eq_file_block); |
| RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
| } |
| } |
| } |
| |
| for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
| auto& eq_file_block = _equality_delete_blocks[block_idx]; |
| auto equality_delete_impl = |
| EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
| RETURN_IF_ERROR(equality_delete_impl->init(_profile)); |
| _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
| } |
| return Status::OK(); |
| } |
| |
| Status IcebergOrcReader::_process_equality_delete( |
| const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
| std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
| partition_columns; |
| std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
| |
| std::map<int, int> data_file_id_to_field_idx; |
| for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) { |
| if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { |
| return Status::DataQualityError("Iceberg equality delete data file missing field id."); |
| } |
| auto field_id = std::stoi( |
| _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
| data_file_id_to_field_idx[field_id] = idx; |
| } |
| |
| for (const auto& delete_file : delete_files) { |
| TFileRangeDesc delete_desc; |
| // must use __set() method to make sure __isset is true |
| delete_desc.__set_fs_name(_range.fs_name); |
| delete_desc.path = delete_file.path; |
| delete_desc.start_offset = 0; |
| delete_desc.size = -1; |
| delete_desc.file_size = -1; |
| |
| if (!delete_file.__isset.field_ids) [[unlikely]] { |
| return Status::InternalError( |
| "missing delete field ids when reading equality delete file"); |
| } |
| auto& read_column_field_ids = delete_file.field_ids; |
| std::set<int> read_column_field_ids_set; |
| for (const auto& field_id : read_column_field_ids) { |
| read_column_field_ids_set.insert(field_id); |
| _equality_delete_col_ids.insert(field_id); |
| } |
| |
| auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc, |
| READ_DELETE_FILE_BATCH_SIZE, |
| _state->timezone(), _io_ctx, _meta_cache); |
| RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
| // delete file schema |
| std::vector<std::string> delete_file_col_names; |
| std::vector<DataTypePtr> delete_file_col_types; |
| RETURN_IF_ERROR( |
| delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types)); |
| |
| // the column that to read equality delete file. |
| // (delete file maybe have extra columns that don't need to read) |
| std::vector<std::string> delete_col_names; |
| std::vector<DataTypePtr> delete_col_types; |
| std::vector<int> delete_col_ids; |
| std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
| |
| const orc::Type* delete_field_desc = nullptr; |
| RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc)); |
| DCHECK(delete_field_desc != nullptr); |
| |
| auto eq_file_node = std::make_shared<TableSchemaChangeHelper::StructNode>(); |
| |
| for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) { |
| auto delete_file_field = delete_field_desc->getSubtype(idx); |
| |
| if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) |
| [[unlikely]] { // missing delete_file_field id |
| // equality delete file must have delete_file_field id to match column. |
| return Status::DataQualityError( |
| "missing delete_file_field id when reading equality delete file"); |
| } else { |
| auto delete_field_id = |
| std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); |
| if (read_column_field_ids_set.contains(delete_field_id)) { |
| // the column that need to read. |
| if (is_complex_type(delete_file_col_types[idx]->get_primitive_type())) |
| [[unlikely]] { |
| return Status::InternalError( |
| "can not support read complex column in equality delete file."); |
| } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] { |
| return Status::DataQualityError( |
| "can not find delete field id in data file schema when reading " |
| "equality delete file"); |
| } |
| |
| auto data_file_field = _data_file_type_desc->getSubtype( |
| data_file_id_to_field_idx[delete_field_id]); |
| |
| if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] { |
| return Status::NotSupported( |
| "Not Support type change in equality delete, field: {}, delete " |
| "file type: {}, data file type: {}", |
| delete_field_id, delete_file_field->getKind(), |
| data_file_field->getKind()); |
| } |
| std::string filed_lower_name = to_lower(delete_field_desc->getFieldName(idx)); |
| eq_file_node->add_children( |
| filed_lower_name, delete_field_desc->getFieldName(idx), |
| std::make_shared<TableSchemaChangeHelper::ScalarNode>()); |
| |
| delete_col_ids.emplace_back(delete_field_id); |
| delete_col_names.emplace_back(filed_lower_name); |
| delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx])); |
| read_column_field_ids_set.erase(delete_field_id); |
| } |
| } |
| } |
| if (!read_column_field_ids_set.empty()) [[unlikely]] { |
| return Status::DataQualityError("some field ids not found in equality delete file."); |
| } |
| |
| for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
| delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
| } |
| |
| RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx, |
| {}, false, nullptr, nullptr, nullptr, nullptr, |
| eq_file_node)); |
| RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); |
| |
| if (!_equality_delete_block_map.contains(delete_col_ids)) { |
| _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
| Block block; |
| _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
| _equality_delete_blocks.emplace_back(block); |
| } |
| Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
| bool eof = false; |
| while (!eof) { |
| Block tmp_block; |
| _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
| size_t read_rows = 0; |
| RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof)); |
| if (read_rows > 0) { |
| MutableBlock mutable_block(&eq_file_block); |
| RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
| } |
| } |
| } |
| |
| for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
| auto& eq_file_block = _equality_delete_blocks[block_idx]; |
| auto equality_delete_impl = |
| EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
| RETURN_IF_ERROR(equality_delete_impl->init(_profile)); |
| _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
| } |
| return Status::OK(); |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris |