| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <string> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include "common/consts.h" |
| #include "common/status.h" |
| #include "core/block/block.h" |
| #include "core/column/column_dictionary.h" |
| #include "core/column/column_nullable.h" |
| #include "core/column/column_string.h" |
| #include "core/column/column_struct.h" |
| #include "core/data_type/data_type_number.h" |
| #include "core/data_type/data_type_string.h" |
| #include "format/generic_reader.h" |
| #include "format/table/deletion_vector_reader.h" |
| #include "format/table/equality_delete.h" |
| #include "format/table/table_schema_change_helper.h" |
| #include "runtime/runtime_profile.h" |
| #include "runtime/runtime_state.h" |
| #include "storage/olap_common.h" |
| |
| namespace doris { |
| class TIcebergDeleteFileDesc; |
| } // namespace doris |
| |
| namespace doris { |
| |
| class ShardedKVCache; |
| |
| // CRTP mixin for Iceberg reader functionality. |
| // BaseReader should be ParquetReader or OrcReader. |
| // Inherits BaseReader + TableSchemaChangeHelper, providing shared Iceberg logic |
| // (delete files, deletion vectors, equality delete, $row_id synthesis). |
| // |
| // Inheritance chain: |
| // IcebergParquetReader -> IcebergReaderMixin<ParquetReader> -> ParquetReader -> GenericReader |
| // IcebergOrcReader -> IcebergReaderMixin<OrcReader> -> OrcReader -> GenericReader |
| template <typename BaseReader> |
| class IcebergReaderMixin : public BaseReader, public TableSchemaChangeHelper { |
| public: |
| struct PositionDeleteRange { |
| std::vector<std::string> data_file_path; |
| std::vector<std::pair<int, int>> range; |
| }; |
| |
| // Forward BaseReader constructor arguments + Iceberg-specific kv_cache |
| template <typename... Args> |
| IcebergReaderMixin(ShardedKVCache* kv_cache, Args&&... args) |
| : BaseReader(std::forward<Args>(args)...), _kv_cache(kv_cache) { |
| static const char* iceberg_profile = "IcebergProfile"; |
| ADD_TIMER(this->get_profile(), iceberg_profile); |
| _iceberg_profile.num_delete_files = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteFiles", |
| TUnit::UNIT, iceberg_profile); |
| _iceberg_profile.num_delete_rows = ADD_CHILD_COUNTER(this->get_profile(), "NumDeleteRows", |
| TUnit::UNIT, iceberg_profile); |
| _iceberg_profile.delete_files_read_time = |
| ADD_CHILD_TIMER(this->get_profile(), "DeleteFileReadTime", iceberg_profile); |
| _iceberg_profile.delete_rows_sort_time = |
| ADD_CHILD_TIMER(this->get_profile(), "DeleteRowsSortTime", iceberg_profile); |
| _iceberg_profile.parse_delete_file_time = |
| ADD_CHILD_TIMER(this->get_profile(), "ParseDeleteFileTime", iceberg_profile); |
| } |
| |
| ~IcebergReaderMixin() override = default; |
| |
| void set_current_file_info(const std::string& file_path, int32_t partition_spec_id, |
| const std::string& partition_data_json) { |
| _current_file_path = file_path; |
| _partition_spec_id = partition_spec_id; |
| _partition_data_json = partition_data_json; |
| } |
| |
| enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR }; |
| enum Fileformat { NONE, PARQUET, ORC, AVRO }; |
| |
| virtual void set_delete_rows() = 0; |
| |
| // Table-level COUNT(*) is handled by CountReader (created by FileScanner after |
| // init_reader). If _do_get_next_block is called, COUNT must have been resolved. |
| Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override { |
| DCHECK(this->_push_down_agg_type != TPushAggOp::type::COUNT); |
| return BaseReader::_do_get_next_block(block, read_rows, eof); |
| } |
| |
| void set_create_row_id_column_iterator_func( |
| std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) { |
| _create_topn_row_id_column_iterator = create_func; |
| } |
| |
| protected: |
| // ---- Hook implementations ---- |
| |
| // Called before reading a block: expand block for equality delete columns + detect row_id |
| Status on_before_read_block(Block* block) override { |
| RETURN_IF_ERROR(_expand_block_if_need(block)); |
| return Status::OK(); |
| } |
| |
| /// Fill Iceberg $row_id synthesized column. Registered as handler during init. |
| Status _fill_iceberg_row_id(Block* block, size_t rows) { |
| int row_id_pos = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL); |
| DORIS_CHECK(row_id_pos >= 0); |
| |
| // Lazy-init file info: only set when $row_id is actually needed. |
| const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
| std::string file_path = table_desc.original_file_path; |
| int32_t partition_spec_id = |
| table_desc.__isset.partition_spec_id ? table_desc.partition_spec_id : 0; |
| std::string partition_data_json; |
| if (table_desc.__isset.partition_data_json) { |
| partition_data_json = table_desc.partition_data_json; |
| } |
| set_current_file_info(file_path, partition_spec_id, partition_data_json); |
| |
| const auto& row_ids = this->current_batch_row_positions(); |
| auto& col_with_type = block->get_by_position(static_cast<size_t>(row_id_pos)); |
| MutableColumnPtr row_id_column; |
| RETURN_IF_ERROR(_build_iceberg_rowid_column(col_with_type.type, _current_file_path, row_ids, |
| _partition_spec_id, _partition_data_json, |
| &row_id_column)); |
| col_with_type.column = std::move(row_id_column); |
| return Status::OK(); |
| } |
| |
| void _init_row_lineage_columns() { |
| const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
| if (table_desc.__isset.first_row_id) { |
| _row_lineage_columns.first_row_id = table_desc.first_row_id; |
| } |
| if (table_desc.__isset.last_updated_sequence_number) { |
| _row_lineage_columns.last_updated_sequence_number = |
| table_desc.last_updated_sequence_number; |
| } |
| } |
| |
| Status _fill_row_lineage_row_id(Block* block, size_t rows) { |
| int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID); |
| DORIS_CHECK(col_pos >= 0); |
| |
| if (_row_lineage_columns.first_row_id >= 0) { |
| auto column_guard = block->mutate_column_scoped(col_pos); |
| auto* nullable_column = |
| assert_cast<ColumnNullable*>(column_guard.mutable_column().get()); |
| auto& null_map = nullable_column->get_null_map_data(); |
| auto& data = |
| assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
| const auto& row_ids = this->current_batch_row_positions(); |
| for (size_t i = 0; i < rows; ++i) { |
| if (null_map[i] != 0) { |
| null_map[i] = 0; |
| data[i] = _row_lineage_columns.first_row_id + static_cast<int64_t>(row_ids[i]); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t rows) { |
| int col_pos = block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER); |
| DORIS_CHECK(col_pos >= 0); |
| |
| if (_row_lineage_columns.last_updated_sequence_number >= 0) { |
| auto column_guard = block->mutate_column_scoped(col_pos); |
| auto* nullable_column = |
| assert_cast<ColumnNullable*>(column_guard.mutable_column().get()); |
| auto& null_map = nullable_column->get_null_map_data(); |
| auto& data = |
| assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data(); |
| for (size_t i = 0; i < rows; ++i) { |
| if (null_map[i] != 0) { |
| null_map[i] = 0; |
| data[i] = _row_lineage_columns.last_updated_sequence_number; |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Called after reading a block: apply equality delete filter + shrink block |
| Status on_after_read_block(Block* block, size_t* read_rows) override { |
| if (!_equality_delete_impls.empty()) { |
| std::unique_ptr<IColumn::Filter> filter = |
| std::make_unique<IColumn::Filter>(block->rows(), 1); |
| for (auto& equality_delete_impl : _equality_delete_impls) { |
| RETURN_IF_ERROR(equality_delete_impl->filter_data_block( |
| block, this->col_name_to_block_idx_ref(), _id_to_block_column_name, |
| *filter)); |
| } |
| Block::filter_block_internal(block, *filter, block->columns()); |
| *read_rows = block->rows(); |
| } |
| return _shrink_block_if_need(block); |
| } |
| |
| // ---- Shared Iceberg methods ---- |
| |
| Status _init_row_filters(); |
| Status _position_delete_base(const std::string data_file_path, |
| const std::vector<TIcebergDeleteFileDesc>& delete_files); |
| Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files); |
| Status read_deletion_vector(const std::string& data_file_path, |
| const TIcebergDeleteFileDesc& delete_file_desc); |
| |
| Status _expand_block_if_need(Block* block); |
| Status _shrink_block_if_need(Block* block); |
| |
| // Type aliases — must be defined before member function declarations that use them. |
| using DeleteRows = std::vector<int64_t>; |
| using DeleteFile = phmap::parallel_flat_hash_map< |
| std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>, |
| std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, |
| std::mutex>; |
| |
| PositionDeleteRange _get_range(const ColumnDictI32& file_path_column); |
| PositionDeleteRange _get_range(const ColumnString& file_path_column); |
| static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array, |
| int64_t num_delete_rows, std::vector<int64_t>& result); |
| void _gen_position_delete_file_range(Block& block, DeleteFile* position_delete, |
| size_t read_rows, bool file_path_column_dictionary_coded); |
| void _generate_equality_delete_block(Block* block, |
| const std::vector<std::string>& equality_delete_col_names, |
| const std::vector<DataTypePtr>& equality_delete_col_types); |
| |
| // Pure virtual: format-specific delete file reading |
| virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0; |
| virtual std::unique_ptr<GenericReader> _create_equality_reader( |
| const TFileRangeDesc& delete_desc) = 0; |
| |
| static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } |
| |
| /// Build the Iceberg V2 row-id struct column. |
| static Status _build_iceberg_rowid_column(const DataTypePtr& type, const std::string& file_path, |
| const std::vector<rowid_t>& row_ids, |
| int32_t partition_spec_id, |
| const std::string& partition_data_json, |
| MutableColumnPtr* column_out) { |
| if (type == nullptr || column_out == nullptr) { |
| return Status::InvalidArgument("Invalid iceberg rowid column type or output column"); |
| } |
| MutableColumnPtr column = type->create_column(); |
| ColumnNullable* nullable_col = check_and_get_column<ColumnNullable>(column.get()); |
| ColumnStruct* struct_col = nullptr; |
| if (nullable_col != nullptr) { |
| struct_col = |
| check_and_get_column<ColumnStruct>(nullable_col->get_nested_column_ptr().get()); |
| } else { |
| struct_col = check_and_get_column<ColumnStruct>(column.get()); |
| } |
| if (struct_col == nullptr || struct_col->tuple_size() < 4) { |
| return Status::InternalError("Invalid iceberg rowid column structure"); |
| } |
| size_t num_rows = row_ids.size(); |
| auto& file_path_col = struct_col->get_column(0); |
| auto& row_pos_col = struct_col->get_column(1); |
| auto& spec_id_col = struct_col->get_column(2); |
| auto& partition_data_col = struct_col->get_column(3); |
| file_path_col.reserve(num_rows); |
| row_pos_col.reserve(num_rows); |
| spec_id_col.reserve(num_rows); |
| partition_data_col.reserve(num_rows); |
| for (size_t i = 0; i < num_rows; ++i) { |
| file_path_col.insert_data(file_path.data(), file_path.size()); |
| } |
| for (size_t i = 0; i < num_rows; ++i) { |
| int64_t row_pos = static_cast<int64_t>(row_ids[i]); |
| row_pos_col.insert_data(reinterpret_cast<const char*>(&row_pos), sizeof(row_pos)); |
| } |
| for (size_t i = 0; i < num_rows; ++i) { |
| int32_t spec_id = partition_spec_id; |
| spec_id_col.insert_data(reinterpret_cast<const char*>(&spec_id), sizeof(spec_id)); |
| } |
| for (size_t i = 0; i < num_rows; ++i) { |
| partition_data_col.insert_data(partition_data_json.data(), partition_data_json.size()); |
| } |
| if (nullable_col != nullptr) { |
| nullable_col->get_null_map_data().resize_fill(num_rows, 0); |
| } |
| *column_out = std::move(column); |
| return Status::OK(); |
| } |
| |
| struct IcebergProfile { |
| RuntimeProfile::Counter* num_delete_files; |
| RuntimeProfile::Counter* num_delete_rows; |
| RuntimeProfile::Counter* delete_files_read_time; |
| RuntimeProfile::Counter* delete_rows_sort_time; |
| RuntimeProfile::Counter* parse_delete_file_time; |
| }; |
| |
| bool _need_row_id_column = false; |
| std::string _current_file_path; |
| int32_t _partition_spec_id = 0; |
| std::string _partition_data_json; |
| |
| ShardedKVCache* _kv_cache; |
| IcebergProfile _iceberg_profile; |
| const std::vector<int64_t>* _iceberg_delete_rows = nullptr; |
| std::vector<std::string> _expand_col_names; |
| std::vector<ColumnWithTypeAndName> _expand_columns; |
| std::vector<std::string> _all_required_col_names; |
| Fileformat _file_format = Fileformat::NONE; |
| |
| const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; |
| const std::string ICEBERG_FILE_PATH = "file_path"; |
| const std::string ICEBERG_ROW_POS = "pos"; |
| const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS}; |
| const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = { |
| {ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}}; |
| const int ICEBERG_FILE_PATH_INDEX = 0; |
| const int ICEBERG_FILE_POS_INDEX = 1; |
| const int READ_DELETE_FILE_BATCH_SIZE = 102400; |
| |
| // all ids that need read for eq delete (from all eq delete files) |
| std::set<int> _equality_delete_col_ids; |
| // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls |
| std::map<std::vector<int>, int> _equality_delete_block_map; |
| // EqualityDeleteBase stores raw pointers to these blocks, so do not modify this vector after |
| // creating entries in _equality_delete_impls. |
| std::vector<Block> _equality_delete_blocks; |
| std::vector<std::unique_ptr<EqualityDeleteBase>> _equality_delete_impls; |
| |
| // id -> block column name |
| std::unordered_map<int, std::string> _id_to_block_column_name; |
| |
| // File column names used during init |
| std::vector<std::string> _file_col_names; |
| |
| std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> |
| _create_topn_row_id_column_iterator; |
| |
| static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; |
| static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = |
| "_last_updated_sequence_number"; |
| struct RowLineageColumns { |
| int64_t first_row_id = -1; |
| int64_t last_updated_sequence_number = -1; |
| }; |
| RowLineageColumns _row_lineage_columns; |
| }; |
| |
| // ============================================================================ |
| // Template method implementations (must be in header for templates) |
| // ============================================================================ |
| |
| template <typename BaseReader> |
| Status IcebergReaderMixin<BaseReader>::_init_row_filters() { |
| // COUNT(*) short-circuit |
| if (this->_push_down_agg_type == TPushAggOp::type::COUNT && |
| this->get_scan_range().table_format_params.__isset.table_level_row_count && |
| this->get_scan_range().table_format_params.table_level_row_count > 0) { |
| return Status::OK(); |
| } |
| |
| const auto& table_desc = this->get_scan_range().table_format_params.iceberg_params; |
| const auto& version = table_desc.format_version; |
| if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { |
| return Status::OK(); |
| } |
| |
| std::vector<TIcebergDeleteFileDesc> position_delete_files; |
| std::vector<TIcebergDeleteFileDesc> equality_delete_files; |
| std::vector<TIcebergDeleteFileDesc> deletion_vector_files; |
| for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) { |
| if (desc.content == POSITION_DELETE) { |
| position_delete_files.emplace_back(desc); |
| } else if (desc.content == EQUALITY_DELETE) { |
| equality_delete_files.emplace_back(desc); |
| } else if (desc.content == DELETION_VECTOR) { |
| deletion_vector_files.emplace_back(desc); |
| } |
| } |
| |
| if (!equality_delete_files.empty()) { |
| RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); |
| this->set_push_down_agg_type(TPushAggOp::NONE); |
| } |
| |
| if (!deletion_vector_files.empty()) { |
| if (deletion_vector_files.size() != 1) [[unlikely]] { |
| /* |
| * Deletion vectors are a binary representation of deletes for a single data file that is more efficient |
| * at execution time than position delete files. Unlike equality or position delete files, there can be |
| * at most one deletion vector for a given data file in a snapshot. |
| */ |
| return Status::DataQualityError("This iceberg data file has multiple DVs."); |
| } |
| RETURN_IF_ERROR( |
| read_deletion_vector(table_desc.original_file_path, deletion_vector_files[0])); |
| this->set_push_down_agg_type(TPushAggOp::NONE); |
| } else if (!position_delete_files.empty()) { |
| RETURN_IF_ERROR( |
| _position_delete_base(table_desc.original_file_path, position_delete_files)); |
| this->set_push_down_agg_type(TPushAggOp::NONE); |
| } |
| |
| COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size()); |
| return Status::OK(); |
| } |
| |
| template <typename BaseReader> |
| Status IcebergReaderMixin<BaseReader>::_equality_delete_base( |
| const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
| std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
| partition_columns; |
| std::unordered_map<std::string, VExprContextSPtr> missing_columns; |
| |
| for (const auto& delete_file : delete_files) { |
| TFileRangeDesc delete_desc; |
| delete_desc.__set_fs_name(this->get_scan_range().fs_name); |
| delete_desc.path = delete_file.path; |
| delete_desc.start_offset = 0; |
| delete_desc.size = -1; |
| delete_desc.file_size = -1; |
| |
| if (!delete_file.__isset.field_ids) [[unlikely]] { |
| return Status::InternalError( |
| "missing delete field ids when reading equality delete file"); |
| } |
| auto& read_column_field_ids = delete_file.field_ids; |
| std::set<int> read_column_field_ids_set; |
| for (const auto& field_id : read_column_field_ids) { |
| read_column_field_ids_set.insert(field_id); |
| _equality_delete_col_ids.insert(field_id); |
| } |
| |
| std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc); |
| RETURN_IF_ERROR(delete_reader->init_schema_reader()); |
| |
| std::vector<std::string> equality_delete_col_names; |
| std::vector<DataTypePtr> equality_delete_col_types; |
| |
| // Build delete col names/types/ids by matching field_ids from delete file schema. |
| // Master iterates delete file's FieldDescriptor and uses field_id to match, |
| // NOT idx-based pairing (get_parsed_schema order != field_ids order). |
| std::vector<std::string> delete_col_names; |
| std::vector<DataTypePtr> delete_col_types; |
| std::vector<int> delete_col_ids; |
| std::unordered_map<std::string, uint32_t> delete_col_name_to_block_idx; |
| |
| if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) { |
| const FieldDescriptor* delete_field_desc = nullptr; |
| RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&delete_field_desc)); |
| DCHECK(delete_field_desc != nullptr); |
| |
| for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) { |
| if (delete_file_field.field_id == -1) [[unlikely]] { |
| return Status::DataQualityError( |
| "missing field id when reading equality delete file"); |
| } |
| if (!read_column_field_ids_set.contains(delete_file_field.field_id)) { |
| continue; |
| } |
| if (delete_file_field.children.size() > 0) [[unlikely]] { |
| return Status::InternalError( |
| "can not support read complex column in equality delete file"); |
| } |
| |
| delete_col_ids.emplace_back(delete_file_field.field_id); |
| delete_col_names.emplace_back(delete_file_field.name); |
| delete_col_types.emplace_back(make_nullable(delete_file_field.data_type)); |
| |
| int field_id = delete_file_field.field_id; |
| if (!_id_to_block_column_name.contains(field_id)) { |
| _id_to_block_column_name.emplace(field_id, delete_file_field.name); |
| _expand_col_names.emplace_back(delete_file_field.name); |
| _expand_columns.emplace_back( |
| make_nullable(delete_file_field.data_type)->create_column(), |
| make_nullable(delete_file_field.data_type), delete_file_field.name); |
| } |
| } |
| for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
| delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
| } |
| // Delete files have TFileRangeDesc.size=-1, which would cause |
| // set_fill_columns to return EndOfFile("No row group to read") |
| // when _filter_groups is true. Master passes filter_groups=false. |
| ParquetInitContext eq_delete_ctx; |
| eq_delete_ctx.filter_groups = false; |
| eq_delete_ctx.column_names = delete_col_names; |
| eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx; |
| auto st2 = parquet_reader->init_reader(&eq_delete_ctx); |
| if (!st2.ok()) { |
| return st2; |
| } |
| } else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) { |
| // For ORC: use get_parsed_schema with field_ids from delete_file |
| // ORC field_ids come from the Thrift descriptor, not from ORC metadata |
| RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names, |
| &equality_delete_col_types)); |
| for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) { |
| if (idx < read_column_field_ids.size()) { |
| int field_id = read_column_field_ids[idx]; |
| if (!read_column_field_ids_set.contains(field_id)) continue; |
| delete_col_ids.emplace_back(field_id); |
| delete_col_names.emplace_back(equality_delete_col_names[idx]); |
| delete_col_types.emplace_back(make_nullable(equality_delete_col_types[idx])); |
| if (!_id_to_block_column_name.contains(field_id)) { |
| _id_to_block_column_name.emplace(field_id, equality_delete_col_names[idx]); |
| _expand_col_names.emplace_back(equality_delete_col_names[idx]); |
| _expand_columns.emplace_back( |
| make_nullable(equality_delete_col_types[idx])->create_column(), |
| make_nullable(equality_delete_col_types[idx]), |
| equality_delete_col_names[idx]); |
| } |
| } |
| } |
| for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { |
| delete_col_name_to_block_idx[delete_col_names[idx]] = idx; |
| } |
| OrcInitContext eq_delete_ctx; |
| eq_delete_ctx.column_names = delete_col_names; |
| eq_delete_ctx.col_name_to_block_idx = &delete_col_name_to_block_idx; |
| RETURN_IF_ERROR(orc_reader->init_reader(&eq_delete_ctx)); |
| } else { |
| return Status::InternalError("Unsupported format of delete file"); |
| } |
| |
| if (!_equality_delete_block_map.contains(delete_col_ids)) { |
| _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); |
| Block block; |
| _generate_equality_delete_block(&block, delete_col_names, delete_col_types); |
| _equality_delete_blocks.emplace_back(block); |
| } |
| Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; |
| |
| bool eof = false; |
| while (!eof) { |
| Block tmp_block; |
| _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); |
| size_t read_rows = 0; |
| auto st = delete_reader->get_next_block(&tmp_block, &read_rows, &eof); |
| if (!st.ok()) { |
| return st; |
| } |
| if (read_rows > 0) { |
| ScopedMutableBlock scoped_mutable_block(&eq_file_block); |
| auto& mutable_block = scoped_mutable_block.mutable_block(); |
| RETURN_IF_ERROR(mutable_block.merge(tmp_block)); |
| } |
| } |
| } |
| |
| for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { |
| auto& eq_file_block = _equality_delete_blocks[block_idx]; |
| auto equality_delete_impl = |
| EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); |
| RETURN_IF_ERROR(equality_delete_impl->init(this->get_profile())); |
| _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); |
| } |
| return Status::OK(); |
| } |
| |
| template <typename BaseReader> |
| void IcebergReaderMixin<BaseReader>::_generate_equality_delete_block( |
| Block* block, const std::vector<std::string>& equality_delete_col_names, |
| const std::vector<DataTypePtr>& equality_delete_col_types) { |
| for (int i = 0; i < equality_delete_col_names.size(); ++i) { |
| DataTypePtr data_type = make_nullable(equality_delete_col_types[i]); |
| MutableColumnPtr data_column = data_type->create_column(); |
| block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, |
| equality_delete_col_names[i])); |
| } |
| } |
| |
| template <typename BaseReader> |
| Status IcebergReaderMixin<BaseReader>::_expand_block_if_need(Block* block) { |
| std::set<std::string> names; |
| auto block_names = block->get_names(); |
| names.insert(block_names.begin(), block_names.end()); |
| for (auto& col : _expand_columns) { |
| if (names.contains(col.name)) { |
| return Status::InternalError("Wrong expand column '{}'", col.name); |
| } |
| names.insert(col.name); |
| (*this->col_name_to_block_idx_ref())[col.name] = block->columns(); |
| block->insert({col.type->create_column(), col.type, col.name}); |
| } |
| return Status::OK(); |
| } |
| |
| template <typename BaseReader> |
| Status IcebergReaderMixin<BaseReader>::_shrink_block_if_need(Block* block) { |
| std::set<size_t> positions_to_erase; |
| for (const std::string& expand_col : _expand_col_names) { |
| if (!this->col_name_to_block_idx_ref()->contains(expand_col)) { |
| return Status::InternalError("Wrong erase column '{}', block: {}", expand_col, |
| block->dump_names()); |
| } |
| positions_to_erase.emplace((*this->col_name_to_block_idx_ref())[expand_col]); |
| } |
| block->erase(positions_to_erase); |
| for (const std::string& expand_col : _expand_col_names) { |
| this->col_name_to_block_idx_ref()->erase(expand_col); |
| } |
| return Status::OK(); |
| } |
| |
| template <typename BaseReader> |
| Status IcebergReaderMixin<BaseReader>::_position_delete_base( |
| const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
| std::vector<DeleteRows*> delete_rows_array; |
| int64_t num_delete_rows = 0; |
| for (const auto& delete_file : delete_files) { |
| SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
| Status create_status = Status::OK(); |
| auto* delete_file_cache = _kv_cache->template get<DeleteFile>( |
| _delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* { |
| auto* position_delete = new DeleteFile; |
| TFileRangeDesc delete_file_range; |
| delete_file_range.__set_fs_name(this->get_scan_range().fs_name); |
| delete_file_range.path = delete_file.path; |
| delete_file_range.start_offset = 0; |
| delete_file_range.size = -1; |
| delete_file_range.file_size = -1; |
| create_status = _read_position_delete_file(&delete_file_range, position_delete); |
| if (!create_status) { |
| return nullptr; |
| } |
| return position_delete; |
| }); |
| if (create_status.is<ErrorCode::END_OF_FILE>()) { |
| continue; |
| } else if (!create_status.ok()) { |
| return create_status; |
| } |
| |
| DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache); |
| auto get_value = [&](const auto& v) { |
| DeleteRows* row_ids = v.second.get(); |
| if (!row_ids->empty()) { |
| delete_rows_array.emplace_back(row_ids); |
| num_delete_rows += row_ids->size(); |
| } |
| }; |
| delete_file_map.if_contains(data_file_path, get_value); |
| } |
| if (num_delete_rows > 0) { |
| SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time); |
| _iceberg_delete_rows = |
| _kv_cache->template get<DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
| auto* data_file_position_delete = new DeleteRows; |
| _sort_delete_rows(delete_rows_array, num_delete_rows, |
| *data_file_position_delete); |
| return data_file_position_delete; |
| }); |
| set_delete_rows(); |
| COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows); |
| } |
| return Status::OK(); |
| } |
| |
| template <typename BaseReader> |
| typename IcebergReaderMixin<BaseReader>::PositionDeleteRange |
| IcebergReaderMixin<BaseReader>::_get_range(const ColumnDictI32& file_path_column) { |
| PositionDeleteRange range; |
| size_t read_rows = file_path_column.get_data().size(); |
| const int* code_path = file_path_column.get_data().data(); |
| const int* code_path_start = code_path; |
| const int* code_path_end = code_path + read_rows; |
| while (code_path < code_path_end) { |
| int code = code_path[0]; |
| const int* code_end = std::upper_bound(code_path, code_path_end, code); |
| range.data_file_path.emplace_back(file_path_column.get_value(code).to_string()); |
| range.range.emplace_back(code_path - code_path_start, code_end - code_path_start); |
| code_path = code_end; |
| } |
| return range; |
| } |
| |
| template <typename BaseReader> |
| typename IcebergReaderMixin<BaseReader>::PositionDeleteRange |
| IcebergReaderMixin<BaseReader>::_get_range(const ColumnString& file_path_column) { |
| PositionDeleteRange range; |
| size_t read_rows = file_path_column.size(); |
| size_t index = 0; |
| while (index < read_rows) { |
| StringRef data_path = file_path_column.get_data_at(index); |
| size_t left = index - 1; |
| size_t right = read_rows; |
| while (left + 1 != right) { |
| size_t mid = left + (right - left) / 2; |
| if (file_path_column.get_data_at(mid) > data_path) { |
| right = mid; |
| } else { |
| left = mid; |
| } |
| } |
| range.data_file_path.emplace_back(data_path.to_string()); |
| range.range.emplace_back(index, left + 1); |
| index = left + 1; |
| } |
| return range; |
| } |
| |
| template <typename BaseReader> |
| void IcebergReaderMixin<BaseReader>::_sort_delete_rows( |
| const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t num_delete_rows, |
| std::vector<int64_t>& result) { |
| if (delete_rows_array.empty()) { |
| return; |
| } |
| if (delete_rows_array.size() == 1) { |
| result.resize(num_delete_rows); |
| memcpy(result.data(), delete_rows_array.front()->data(), sizeof(int64_t) * num_delete_rows); |
| return; |
| } |
| if (delete_rows_array.size() == 2) { |
| result.resize(num_delete_rows); |
| std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(), |
| delete_rows_array.back()->begin(), delete_rows_array.back()->end(), |
| result.begin()); |
| return; |
| } |
| |
| using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>; |
| result.resize(num_delete_rows); |
| auto row_id_iter = result.begin(); |
| auto iter_end = result.end(); |
| std::vector<vec_pair> rows_array; |
| for (auto* rows : delete_rows_array) { |
| if (!rows->empty()) { |
| rows_array.emplace_back(rows->begin(), rows->end()); |
| } |
| } |
| size_t array_size = rows_array.size(); |
| while (row_id_iter != iter_end) { |
| int64_t min_index = 0; |
| int64_t min = *rows_array[0].first; |
| for (size_t i = 0; i < array_size; ++i) { |
| if (*rows_array[i].first < min) { |
| min_index = i; |
| min = *rows_array[i].first; |
| } |
| } |
| *row_id_iter++ = min; |
| rows_array[min_index].first++; |
| if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) { |
| rows_array.erase(rows_array.begin() + min_index); |
| array_size--; |
| } |
| } |
| } |
| |
| template <typename BaseReader> |
| void IcebergReaderMixin<BaseReader>::_gen_position_delete_file_range( |
| Block& block, DeleteFile* position_delete, size_t read_rows, |
| bool file_path_column_dictionary_coded) { |
| SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
| auto name_to_pos_map = block.get_name_to_pos_map(); |
| ColumnPtr path_column = block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column; |
| DCHECK_EQ(path_column->size(), read_rows); |
| ColumnPtr pos_column = block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column; |
| using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType; |
| const int64_t* src_data = assert_cast<const ColumnType&>(*pos_column).get_data().data(); |
| PositionDeleteRange range; |
| if (file_path_column_dictionary_coded) { |
| range = _get_range(assert_cast<const ColumnDictI32&>(*path_column)); |
| } else { |
| range = _get_range(assert_cast<const ColumnString&>(*path_column)); |
| } |
| for (int i = 0; i < range.range.size(); ++i) { |
| std::string key = range.data_file_path[i]; |
| auto iter = position_delete->find(key); |
| DeleteRows* delete_rows; |
| if (iter == position_delete->end()) { |
| delete_rows = new DeleteRows; |
| std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows); |
| (*position_delete)[key] = std::move(delete_rows_ptr); |
| } else { |
| delete_rows = iter->second.get(); |
| } |
| const int64_t* cpy_start = src_data + range.range[i].first; |
| const int64_t cpy_count = range.range[i].second - range.range[i].first; |
| int64_t origin_size = delete_rows->size(); |
| delete_rows->resize(origin_size + cpy_count); |
| int64_t* dest_position = &(*delete_rows)[origin_size]; |
| memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t)); |
| } |
| } |
| |
| template <typename BaseReader> |
| Status IcebergReaderMixin<BaseReader>::read_deletion_vector( |
| const std::string& data_file_path, const TIcebergDeleteFileDesc& delete_file_desc) { |
| Status create_status = Status::OK(); |
| SCOPED_TIMER(_iceberg_profile.delete_files_read_time); |
| _iceberg_delete_rows = _kv_cache->template get< |
| DeleteRows>(data_file_path, [&]() -> DeleteRows* { |
| auto* delete_rows = new DeleteRows; |
| |
| TFileRangeDesc delete_range; |
| delete_range.__set_fs_name(this->get_scan_range().fs_name); |
| delete_range.path = delete_file_desc.path; |
| delete_range.start_offset = delete_file_desc.content_offset; |
| delete_range.size = delete_file_desc.content_size_in_bytes; |
| delete_range.file_size = -1; |
| |
| DeletionVectorReader dv_reader(this->get_state(), this->get_profile(), |
| this->get_scan_params(), delete_range, this->get_io_ctx()); |
| create_status = dv_reader.open(); |
| if (!create_status.ok()) [[unlikely]] { |
| return nullptr; |
| } |
| |
| size_t buffer_size = delete_range.size; |
| std::vector<char> buf(buffer_size); |
| if (buffer_size < 12) [[unlikely]] { |
| create_status = Status::DataQualityError("Deletion vector file size too small: {}", |
| buffer_size); |
| return nullptr; |
| } |
| |
| create_status = dv_reader.read_at(delete_range.start_offset, {buf.data(), buffer_size}); |
| if (!create_status) [[unlikely]] { |
| return nullptr; |
| } |
| |
| auto total_length = BigEndian::Load32(buf.data()); |
| if (total_length + 8 != buffer_size) [[unlikely]] { |
| create_status = Status::DataQualityError( |
| "Deletion vector length mismatch, expected: {}, actual: {}", total_length + 8, |
| buffer_size); |
| return nullptr; |
| } |
| |
| constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
| if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4)) [[unlikely]] { |
| create_status = Status::DataQualityError("Deletion vector magic number mismatch"); |
| return nullptr; |
| } |
| |
| roaring::Roaring64Map bitmap; |
| SCOPED_TIMER(_iceberg_profile.parse_delete_file_time); |
| try { |
| bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8, buffer_size - 12); |
| } catch (const std::runtime_error& e) { |
| create_status = Status::DataQualityError("Decode roaring bitmap failed, {}", e.what()); |
| return nullptr; |
| } |
| |
| delete_rows->reserve(bitmap.cardinality()); |
| for (auto it = bitmap.begin(); it != bitmap.end(); it++) { |
| delete_rows->push_back(*it); |
| } |
| COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size()); |
| return delete_rows; |
| }); |
| |
| RETURN_IF_ERROR(create_status); |
| if (!_iceberg_delete_rows->empty()) [[likely]] { |
| set_delete_rows(); |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace doris |