blob: ffbb18d1255ea680e994457ad9dc434364184df4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/status.h"
#include "exec/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/types.h"
#include "table_format_reader.h"
#include "vec/columns/column_dictionary.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/equality_delete.h"
#include "vec/exprs/vslot_ref.h"
namespace tparquet {
class KeyValue;
} // namespace tparquet
namespace doris {
#include "common/compile_check_begin.h"
class RowDescriptor;
class RuntimeState;
class SlotDescriptor;
class TFileRangeDesc;
class TFileScanRangeParams;
class TIcebergDeleteFileDesc;
class TupleDescriptor;
namespace io {
struct IOContext;
} // namespace io
namespace vectorized {
template <typename T>
class ColumnStr;
using ColumnString = ColumnStr<UInt32>;
class Block;
class GenericReader;
class ShardedKVCache;
class VExprContext;
class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper {
public:
struct PositionDeleteRange {
std::vector<std::string> data_file_path;
std::vector<std::pair<int, int>> range;
};
IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
FileMetaCache* meta_cache);
~IcebergTableReader() override = default;
Status init_row_filters() final;
Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final;
enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR };
enum Fileformat { NONE, PARQUET, ORC, AVRO };
virtual void set_delete_rows() = 0;
Status read_deletion_vector(const std::string& data_file_path,
const TIcebergDeleteFileDesc& delete_file_desc);
protected:
struct IcebergProfile {
RuntimeProfile::Counter* num_delete_files;
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
RuntimeProfile::Counter* delete_rows_sort_time;
RuntimeProfile::Counter* parse_delete_file_time;
};
using DeleteRows = std::vector<int64_t>;
using DeleteFile = phmap::parallel_flat_hash_map<
std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>, std::equal_to<>,
std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8,
std::mutex>;
/**
* https://iceberg.apache.org/spec/#position-delete-files
* The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
* Sorting by file_path allows filter pushdown by file in columnar storage formats.
* Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
*/
static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>& delete_rows_array,
int64_t num_delete_rows, std::vector<int64_t>& result);
PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
PositionDeleteRange _get_range(const ColumnString& file_path_column);
static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; }
Status _position_delete_base(const std::string data_file_path,
const std::vector<TIcebergDeleteFileDesc>& delete_files);
Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files);
virtual std::unique_ptr<GenericReader> _create_equality_reader(
const TFileRangeDesc& delete_desc) = 0;
void _generate_equality_delete_block(Block* block,
const std::vector<std::string>& equality_delete_col_names,
const std::vector<DataTypePtr>& equality_delete_col_types);
// Equality delete should read the primary columns. Add the missing columns
Status _expand_block_if_need(Block* block);
// Remove the added delete columns
Status _shrink_block_if_need(Block* block);
// owned by scan node
ShardedKVCache* _kv_cache;
IcebergProfile _iceberg_profile;
// _iceberg_delete_rows from kv_cache
const std::vector<int64_t>* _iceberg_delete_rows = nullptr;
std::vector<std::string> _expand_col_names;
std::vector<ColumnWithTypeAndName> _expand_columns;
std::vector<std::string> _all_required_col_names;
// Pointer to external column name to block index mapping (from FileScanner)
// Used to dynamically add expand columns for equality delete
std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
Fileformat _file_format = Fileformat::NONE;
const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
const std::string ICEBERG_FILE_PATH = "file_path";
const std::string ICEBERG_ROW_POS = "pos";
const std::vector<std::string> delete_file_col_names {ICEBERG_FILE_PATH, ICEBERG_ROW_POS};
const std::unordered_map<std::string, uint32_t> DELETE_COL_NAME_TO_BLOCK_IDX = {
{ICEBERG_FILE_PATH, 0}, {ICEBERG_ROW_POS, 1}};
const int ICEBERG_FILE_PATH_INDEX = 0;
const int ICEBERG_FILE_POS_INDEX = 1;
const int READ_DELETE_FILE_BATCH_SIZE = 102400;
//Read position_delete_file TFileRangeDesc, generate DeleteFile
virtual Status _read_position_delete_file(const TFileRangeDesc*, DeleteFile*) = 0;
void _gen_position_delete_file_range(Block& block, DeleteFile* const position_delete,
size_t read_rows, bool file_path_column_dictionary_coded);
// equality delete
Block _equality_delete_block;
std::unique_ptr<EqualityDeleteBase> _equality_delete_impl;
};
class IcebergParquetReader final : public IcebergTableReader {
public:
ENABLE_FACTORY_CREATOR(IcebergParquetReader);
IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx, FileMetaCache* meta_cache)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx, meta_cache) {}
Status init_reader(
const std::vector<std::string>& file_col_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
void set_delete_rows() final {
auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
parquet_reader->set_delete_rows(_iceberg_delete_rows);
}
protected:
std::unique_ptr<GenericReader> _create_equality_reader(
const TFileRangeDesc& delete_desc) final {
return ParquetReader::create_unique(_profile, _params, delete_desc,
READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(),
_io_ctx, _state, _meta_cache);
}
private:
static ColumnIdResult _create_column_ids(const FieldDescriptor* field_desc,
const TupleDescriptor* tuple_descriptor);
Status _read_position_delete_file(const TFileRangeDesc* delete_range,
DeleteFile* position_delete) final;
};
class IcebergOrcReader final : public IcebergTableReader {
public:
ENABLE_FACTORY_CREATOR(IcebergOrcReader);
Status _read_position_delete_file(const TFileRangeDesc* delete_range,
DeleteFile* position_delete) final;
IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
FileMetaCache* meta_cache)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx, meta_cache) {}
void set_delete_rows() final {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
orc_reader->set_position_delete_rowids(_iceberg_delete_rows);
}
Status init_reader(
const std::vector<std::string>& file_col_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
protected:
std::unique_ptr<GenericReader> _create_equality_reader(
const TFileRangeDesc& delete_desc) override {
return OrcReader::create_unique(_profile, _state, _params, delete_desc,
READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx,
_meta_cache);
}
private:
static ColumnIdResult _create_column_ids(const orc::Type* orc_type,
const TupleDescriptor* tuple_descriptor);
private:
static const std::string ICEBERG_ORC_ATTRIBUTE;
};
} // namespace vectorized
#include "common/compile_check_end.h"
} // namespace doris