| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "format/reader/table_reader.h" |
| |
| #include <arrow/api.h> |
| #include <arrow/io/api.h> |
| #include <gtest/gtest.h> |
| #include <parquet/api/reader.h> |
| #include <parquet/arrow/writer.h> |
| |
| #include <algorithm> |
| #include <filesystem> |
| #include <fstream> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "core/assert_cast.h" |
| #include "core/block/block.h" |
| #include "core/column/column_array.h" |
| #include "core/column/column_map.h" |
| #include "core/column/column_nullable.h" |
| #include "core/column/column_string.h" |
| #include "core/column/column_struct.h" |
| #include "core/column/column_vector.h" |
| #include "core/data_type/data_type_array.h" |
| #include "core/data_type/data_type_map.h" |
| #include "core/data_type/data_type_nullable.h" |
| #include "core/data_type/data_type_number.h" |
| #include "core/data_type/data_type_string.h" |
| #include "core/data_type/data_type_struct.h" |
| #include "exec/common/endian.h" |
| #include "exprs/vexpr.h" |
| #include "format/format_common.h" |
| #include "format/reader/expr/slot_ref.h" |
| #include "format/table/deletion_vector_reader.h" |
| #include "format/table/iceberg_reader_v2.h" |
| #include "gen_cpp/PlanNodes_types.h" |
| #include "io/io_common.h" |
| #include "roaring/roaring64map.hh" |
| #include "runtime/runtime_profile.h" |
| #include "runtime/runtime_state.h" |
| #include "storage/predicate/predicate_creator.h" |
| |
| namespace doris::reader { |
| namespace { |
| |
| FieldProjection field_projection(ColumnId column_id) { |
| return FieldProjection {.field_id = column_id}; |
| } |
| |
| std::vector<ColumnId> projection_ids(const std::vector<FieldProjection>& projections) { |
| std::vector<ColumnId> ids; |
| ids.reserve(projections.size()); |
| for (const auto& projection : projections) { |
| ids.push_back(projection.field_id); |
| } |
| return ids; |
| } |
| |
| class TableInt32GreaterThanExpr final : public VExpr { |
| public: |
| TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value) |
| : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) { |
| add_child(TableSlotRef::create_shared(slot_id, column_id, -1, |
| std::make_shared<DataTypeInt32>(), "id")); |
| set_node_type(TExprNodeType::BINARY_PRED); |
| _opcode = TExprOpcode::GT; |
| } |
| |
| Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, |
| size_t count, ColumnPtr& result_column) const override { |
| const auto* slot_ref = assert_cast<const VSlotRef*>(get_child(0).get()); |
| const auto& input = assert_cast<const ColumnInt32&>( |
| *block->get_by_position(slot_ref->column_id()).column); |
| auto result = ColumnUInt8::create(); |
| auto& result_data = result->get_data(); |
| result_data.resize(count); |
| for (size_t row = 0; row < count; ++row) { |
| const size_t input_row = selector == nullptr ? row : (*selector)[row]; |
| result_data[row] = input.get_element(input_row) > _value; |
| } |
| result_column = std::move(result); |
| return Status::OK(); |
| } |
| |
| const std::string& expr_name() const override { return _expr_name; } |
| |
| private: |
| const int32_t _value; |
| const std::string _expr_name = "TableInt32GreaterThanExpr"; |
| }; |
| |
| class IcebergTableReaderDeleteFileTestHelper final : public doris::iceberg::IcebergTableReader { |
| public: |
| Status parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc, |
| bool* has_delete_file) { |
| return _parse_deletion_vector_file(t_desc, desc, has_delete_file); |
| } |
| }; |
| |
| class IcebergTableReaderScanRequestTestHelper final : public doris::iceberg::IcebergTableReader { |
| public: |
| Status init_for_scan_request_test(std::vector<TableColumn> projected_columns) { |
| _query_options = std::make_unique<TQueryOptions>(); |
| _query_globals = std::make_unique<TQueryGlobals>(); |
| _state = std::make_unique<RuntimeState>(*_query_options, *_query_globals); |
| RETURN_IF_ERROR(init({ |
| .projected_columns = std::move(projected_columns), |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = _state.get(), |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| })); |
| |
| SplitReadOptions split_options; |
| split_options.current_range.__set_path("scan-request-test.parquet"); |
| TTableFormatFileDesc table_format_params; |
| TIcebergFileDesc iceberg_params; |
| iceberg_params.__set_first_row_id(1000); |
| table_format_params.__set_iceberg_params(iceberg_params); |
| split_options.current_range.__set_table_format_params(table_format_params); |
| RETURN_IF_ERROR(prepare_split(split_options)); |
| |
| _delete_rows_storage = {1}; |
| _delete_rows = &_delete_rows_storage; |
| return Status::OK(); |
| } |
| |
| Status customize_request(FileScanRequest* request) { |
| return customize_file_scan_request(request); |
| } |
| |
| private: |
| std::unique_ptr<TQueryOptions> _query_options; |
| std::unique_ptr<TQueryGlobals> _query_globals; |
| std::unique_ptr<RuntimeState> _state; |
| DeleteRows _delete_rows_storage; |
| }; |
| |
| class TableInt32SumGreaterThanExpr final : public VExpr { |
| public: |
| TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int right_slot_id, |
| int right_column_id, int32_t value) |
| : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) { |
| add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1, |
| std::make_shared<DataTypeInt32>(), "id")); |
| add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, -1, |
| std::make_shared<DataTypeInt32>(), "score")); |
| set_node_type(TExprNodeType::BINARY_PRED); |
| _opcode = TExprOpcode::GT; |
| } |
| |
| Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, |
| size_t count, ColumnPtr& result_column) const override { |
| const auto* left_slot_ref = assert_cast<const VSlotRef*>(get_child(0).get()); |
| const auto* right_slot_ref = assert_cast<const VSlotRef*>(get_child(1).get()); |
| const auto& left_input = assert_cast<const ColumnInt32&>( |
| *block->get_by_position(left_slot_ref->column_id()).column); |
| const auto& right_input = assert_cast<const ColumnInt32&>( |
| *block->get_by_position(right_slot_ref->column_id()).column); |
| auto result = ColumnUInt8::create(); |
| auto& result_data = result->get_data(); |
| result_data.resize(count); |
| for (size_t row = 0; row < count; ++row) { |
| const size_t input_row = selector == nullptr ? row : (*selector)[row]; |
| result_data[row] = |
| left_input.get_element(input_row) + right_input.get_element(input_row) > _value; |
| } |
| result_column = std::move(result); |
| return Status::OK(); |
| } |
| |
| const std::string& expr_name() const override { return _expr_name; } |
| |
| private: |
| const int32_t _value; |
| const std::string _expr_name = "TableInt32SumGreaterThanExpr"; |
| }; |
| |
| class TableInt32SumLessThanExpr final : public VExpr { |
| public: |
| TableInt32SumLessThanExpr(int left_slot_id, int left_column_id, int right_slot_id, |
| int right_column_id, int32_t value) |
| : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) { |
| add_child(TableSlotRef::create_shared(left_slot_id, left_column_id, -1, |
| std::make_shared<DataTypeInt32>(), "id")); |
| add_child(TableSlotRef::create_shared(right_slot_id, right_column_id, -1, |
| std::make_shared<DataTypeInt32>(), "score")); |
| set_node_type(TExprNodeType::BINARY_PRED); |
| _opcode = TExprOpcode::LT; |
| } |
| |
| Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector, |
| size_t count, ColumnPtr& result_column) const override { |
| const auto* left_slot_ref = assert_cast<const VSlotRef*>(get_child(0).get()); |
| const auto* right_slot_ref = assert_cast<const VSlotRef*>(get_child(1).get()); |
| const auto& left_input = assert_cast<const ColumnInt32&>( |
| *block->get_by_position(left_slot_ref->column_id()).column); |
| const auto& right_input = assert_cast<const ColumnInt32&>( |
| *block->get_by_position(right_slot_ref->column_id()).column); |
| auto result = ColumnUInt8::create(); |
| auto& result_data = result->get_data(); |
| result_data.resize(count); |
| for (size_t row = 0; row < count; ++row) { |
| const size_t input_row = selector == nullptr ? row : (*selector)[row]; |
| result_data[row] = |
| left_input.get_element(input_row) + right_input.get_element(input_row) < _value; |
| } |
| result_column = std::move(result); |
| return Status::OK(); |
| } |
| |
| const std::string& expr_name() const override { return _expr_name; } |
| |
| private: |
| const int32_t _value; |
| const std::string _expr_name = "TableInt32SumLessThanExpr"; |
| }; |
| |
| std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) { |
| std::shared_ptr<arrow::Array> array; |
| EXPECT_TRUE(builder->Finish(&array).ok()); |
| return array; |
| } |
| |
| std::shared_ptr<arrow::Array> build_int32_array(const std::vector<int32_t>& values) { |
| arrow::Int32Builder builder; |
| for (const auto value : values) { |
| EXPECT_TRUE(builder.Append(value).ok()); |
| } |
| return finish_array(&builder); |
| } |
| |
| std::shared_ptr<arrow::Array> build_int64_array(const std::vector<int64_t>& values) { |
| arrow::Int64Builder builder; |
| for (const auto value : values) { |
| EXPECT_TRUE(builder.Append(value).ok()); |
| } |
| return finish_array(&builder); |
| } |
| |
| std::shared_ptr<arrow::Array> build_string_array(const std::vector<std::string>& values) { |
| arrow::StringBuilder builder; |
| for (const auto& value : values) { |
| EXPECT_TRUE(builder.Append(value).ok()); |
| } |
| return finish_array(&builder); |
| } |
| |
| void write_parquet_file(const std::string& file_path, int32_t id, const std::string& value) { |
| auto schema = arrow::schema({ |
| arrow::field("id", arrow::int32(), false), |
| arrow::field("value", arrow::utf8(), false), |
| }); |
| auto table = arrow::Table::Make(schema, {build_int32_array({id}), build_string_array({value})}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder builder; |
| builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, |
| builder.build())); |
| } |
| |
| void write_iceberg_equality_delete_parquet_file(const std::string& file_path, int32_t field_id, |
| int32_t value) { |
| const auto metadata = |
| arrow::key_value_metadata({"PARQUET:field_id"}, {std::to_string(field_id)}); |
| auto schema = arrow::schema({ |
| arrow::field("id", arrow::int32(), false)->WithMetadata(metadata), |
| }); |
| auto table = arrow::Table::Make(schema, {build_int32_array({value})}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder builder; |
| builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, |
| builder.build())); |
| } |
| |
| void write_iceberg_equality_delete_bigint_parquet_file(const std::string& file_path, |
| int32_t field_id, int64_t value) { |
| const auto metadata = |
| arrow::key_value_metadata({"PARQUET:field_id"}, {std::to_string(field_id)}); |
| auto schema = arrow::schema({ |
| arrow::field("id", arrow::int64(), false)->WithMetadata(metadata), |
| }); |
| auto table = arrow::Table::Make(schema, {build_int64_array({value})}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder builder; |
| builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, |
| builder.build())); |
| } |
| |
| void write_struct_parquet_file(const std::string& file_path, int32_t id) { |
| auto struct_type = arrow::struct_({arrow::field("id", arrow::int32(), false)}); |
| arrow::StructBuilder builder( |
| struct_type, arrow::default_memory_pool(), |
| {std::make_shared<arrow::Int32Builder>(arrow::default_memory_pool())}); |
| auto* id_builder = assert_cast<arrow::Int32Builder*>(builder.field_builder(0)); |
| EXPECT_TRUE(builder.Append().ok()); |
| EXPECT_TRUE(id_builder->Append(id).ok()); |
| |
| auto schema = arrow::schema({ |
| arrow::field("s", struct_type, false), |
| }); |
| auto table = arrow::Table::Make(schema, {finish_array(&builder)}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder writer_builder; |
| writer_builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| writer_builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| writer_builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 1, |
| writer_builder.build())); |
| } |
| |
| void write_struct_parquet_file(const std::string& file_path, const std::vector<int32_t>& ids, |
| int64_t row_group_size = -1) { |
| auto struct_type = arrow::struct_({arrow::field("id", arrow::int32(), false)}); |
| arrow::StructBuilder builder( |
| struct_type, arrow::default_memory_pool(), |
| {std::make_shared<arrow::Int32Builder>(arrow::default_memory_pool())}); |
| auto* id_builder = assert_cast<arrow::Int32Builder*>(builder.field_builder(0)); |
| for (const auto id : ids) { |
| EXPECT_TRUE(builder.Append().ok()); |
| EXPECT_TRUE(id_builder->Append(id).ok()); |
| } |
| |
| auto schema = arrow::schema({ |
| arrow::field("s", struct_type, false), |
| }); |
| auto table = arrow::Table::Make(schema, {finish_array(&builder)}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder writer_builder; |
| writer_builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| writer_builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| writer_builder.compression(::parquet::Compression::UNCOMPRESSED); |
| const auto write_row_group_size = |
| row_group_size > 0 ? row_group_size : static_cast<int64_t>(ids.size()); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, |
| write_row_group_size, |
| writer_builder.build())); |
| } |
| |
| void write_list_struct_parquet_file(const std::string& file_path) { |
| auto struct_type = arrow::struct_( |
| {arrow::field("a", arrow::int32(), false), arrow::field("b", arrow::utf8(), false)}); |
| std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders; |
| auto a_array_builder = std::make_unique<arrow::Int32Builder>(); |
| field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder))); |
| auto b_array_builder = std::make_unique<arrow::StringBuilder>(); |
| field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder))); |
| auto struct_builder = std::make_shared<arrow::StructBuilder>( |
| struct_type, arrow::default_memory_pool(), std::move(field_builders)); |
| auto list_type = arrow::list(arrow::field("element", struct_type, false)); |
| arrow::ListBuilder builder(arrow::default_memory_pool(), struct_builder, list_type); |
| auto* a_builder = assert_cast<arrow::Int32Builder*>(struct_builder->field_builder(0)); |
| auto* b_builder = assert_cast<arrow::StringBuilder*>(struct_builder->field_builder(1)); |
| |
| EXPECT_TRUE(builder.Append().ok()); |
| EXPECT_TRUE(struct_builder->Append().ok()); |
| EXPECT_TRUE(a_builder->Append(10).ok()); |
| EXPECT_TRUE(b_builder->Append("la").ok()); |
| EXPECT_TRUE(struct_builder->Append().ok()); |
| EXPECT_TRUE(a_builder->Append(20).ok()); |
| EXPECT_TRUE(b_builder->Append("lb").ok()); |
| |
| EXPECT_TRUE(builder.Append().ok()); |
| EXPECT_TRUE(struct_builder->Append().ok()); |
| EXPECT_TRUE(a_builder->Append(30).ok()); |
| EXPECT_TRUE(b_builder->Append("lc").ok()); |
| |
| EXPECT_TRUE(builder.AppendEmptyValue().ok()); |
| |
| auto schema = arrow::schema({ |
| arrow::field("xs", list_type, false), |
| }); |
| auto table = arrow::Table::Make(schema, {finish_array(&builder)}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder writer_builder; |
| writer_builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| writer_builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| writer_builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 3, |
| writer_builder.build())); |
| } |
| |
| void write_map_struct_parquet_file(const std::string& file_path) { |
| auto key_builder = std::make_shared<arrow::Int32Builder>(); |
| auto struct_type = arrow::struct_( |
| {arrow::field("a", arrow::int32(), false), arrow::field("b", arrow::utf8(), false)}); |
| std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders; |
| auto a_array_builder = std::make_unique<arrow::Int32Builder>(); |
| field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder))); |
| auto b_array_builder = std::make_unique<arrow::StringBuilder>(); |
| field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder))); |
| auto value_builder = std::make_shared<arrow::StructBuilder>( |
| struct_type, arrow::default_memory_pool(), std::move(field_builders)); |
| auto map_type = arrow::map(arrow::int32(), arrow::field("value", struct_type, false)); |
| arrow::MapBuilder builder(arrow::default_memory_pool(), key_builder, value_builder, map_type); |
| auto* a_builder = assert_cast<arrow::Int32Builder*>(value_builder->field_builder(0)); |
| auto* b_builder = assert_cast<arrow::StringBuilder*>(value_builder->field_builder(1)); |
| |
| EXPECT_TRUE(builder.Append().ok()); |
| EXPECT_TRUE(key_builder->Append(1).ok()); |
| EXPECT_TRUE(value_builder->Append().ok()); |
| EXPECT_TRUE(a_builder->Append(10).ok()); |
| EXPECT_TRUE(b_builder->Append("ma").ok()); |
| EXPECT_TRUE(key_builder->Append(2).ok()); |
| EXPECT_TRUE(value_builder->Append().ok()); |
| EXPECT_TRUE(a_builder->Append(20).ok()); |
| EXPECT_TRUE(b_builder->Append("mb").ok()); |
| |
| EXPECT_TRUE(builder.Append().ok()); |
| EXPECT_TRUE(key_builder->Append(3).ok()); |
| EXPECT_TRUE(value_builder->Append().ok()); |
| EXPECT_TRUE(a_builder->Append(30).ok()); |
| EXPECT_TRUE(b_builder->Append("mc").ok()); |
| |
| EXPECT_TRUE(builder.AppendEmptyValue().ok()); |
| |
| auto schema = arrow::schema({ |
| arrow::field("kv", map_type, false), |
| }); |
| auto table = arrow::Table::Make(schema, {finish_array(&builder)}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder writer_builder; |
| writer_builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| writer_builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| writer_builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, 3, |
| writer_builder.build())); |
| } |
| |
| void write_int_pair_parquet_file(const std::string& file_path, const std::vector<int32_t>& ids, |
| const std::vector<int32_t>& scores, |
| const std::vector<std::string>& values, |
| int64_t row_group_size = -1) { |
| const auto id_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, {"0"}); |
| const auto score_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, {"1"}); |
| const auto value_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, {"2"}); |
| auto schema = arrow::schema({ |
| arrow::field("id", arrow::int32(), false)->WithMetadata(id_metadata), |
| arrow::field("score", arrow::int32(), false)->WithMetadata(score_metadata), |
| arrow::field("value", arrow::utf8(), false)->WithMetadata(value_metadata), |
| }); |
| auto table = arrow::Table::Make(schema, {build_int32_array(ids), build_int32_array(scores), |
| build_string_array(values)}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder builder; |
| builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| builder.compression(::parquet::Compression::UNCOMPRESSED); |
| const auto write_row_group_size = |
| row_group_size > 0 ? row_group_size : static_cast<int64_t>(ids.size()); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, |
| write_row_group_size, builder.build())); |
| } |
| |
| void write_position_delete_parquet_file(const std::string& file_path, |
| const std::vector<std::string>& data_file_paths, |
| const std::vector<int64_t>& positions) { |
| auto schema = arrow::schema({ |
| arrow::field("file_path", arrow::utf8(), false), |
| arrow::field("pos", arrow::int64(), false), |
| }); |
| auto table = arrow::Table::Make( |
| schema, {build_string_array(data_file_paths), build_int64_array(positions)}); |
| |
| auto file_result = arrow::io::FileOutputStream::Open(file_path); |
| ASSERT_TRUE(file_result.ok()) << file_result.status(); |
| std::shared_ptr<arrow::io::FileOutputStream> out = *file_result; |
| |
| ::parquet::WriterProperties::Builder builder; |
| builder.version(::parquet::ParquetVersion::PARQUET_2_6); |
| builder.data_page_version(::parquet::ParquetDataPageVersion::V2); |
| builder.compression(::parquet::Compression::UNCOMPRESSED); |
| PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), out, |
| static_cast<int64_t>(positions.size()), |
| builder.build())); |
| } |
| |
| int64_t write_iceberg_deletion_vector_file(const std::string& file_path, |
| const std::vector<uint64_t>& deleted_positions) { |
| roaring::Roaring64Map rows; |
| for (const auto position : deleted_positions) { |
| rows.add(position); |
| } |
| |
| const size_t bitmap_size = rows.getSizeInBytes(); |
| std::vector<char> blob(4 + 4 + bitmap_size + 4); |
| rows.write(blob.data() + 8); |
| |
| const uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size); |
| BigEndian::Store32(blob.data(), total_length); |
| constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'}; |
| memcpy(blob.data() + 4, DV_MAGIC, 4); |
| BigEndian::Store32(blob.data() + 8 + bitmap_size, 0); |
| |
| std::ofstream output(file_path, std::ios::binary); |
| EXPECT_TRUE(output.is_open()); |
| output.write(blob.data(), static_cast<std::streamsize>(blob.size())); |
| EXPECT_TRUE(output.good()); |
| return static_cast<int64_t>(blob.size()); |
| } |
| |
| Block build_table_block(const std::vector<TableColumn>& columns) { |
| Block block; |
| for (const auto& column : columns) { |
| block.insert({column.type->create_column(), column.type, column.name}); |
| } |
| return block; |
| } |
| |
| void expect_nullable_int64_column_values(const IColumn& column, |
| const std::vector<int64_t>& expected_values) { |
| const auto full_column = column.convert_to_full_column_if_const(); |
| const auto& nullable_column = assert_cast<const ColumnNullable&>(*full_column); |
| const auto& values = |
| assert_cast<const ColumnInt64&>(nullable_column.get_nested_column()).get_data(); |
| ASSERT_EQ(nullable_column.size(), expected_values.size()); |
| for (size_t row = 0; row < expected_values.size(); ++row) { |
| EXPECT_EQ(nullable_column.get_null_map_data()[row], 0); |
| EXPECT_EQ(values[row], expected_values[row]); |
| } |
| } |
| |
| SplitReadOptions build_split_options(const std::string& file_path) { |
| SplitReadOptions options; |
| options.current_range.__set_path(file_path); |
| options.current_range.__set_file_size( |
| static_cast<int64_t>(std::filesystem::file_size(file_path))); |
| return options; |
| } |
| |
| void set_iceberg_row_lineage_params(SplitReadOptions* split_options, int64_t first_row_id, |
| int64_t last_updated_sequence_number) { |
| TTableFormatFileDesc table_format_params; |
| TIcebergFileDesc iceberg_params; |
| iceberg_params.__set_first_row_id(first_row_id); |
| iceberg_params.__set_last_updated_sequence_number(last_updated_sequence_number); |
| table_format_params.__set_iceberg_params(iceberg_params); |
| split_options->current_range.__set_table_format_params(table_format_params); |
| } |
| |
| TIcebergDeleteFileDesc make_iceberg_deletion_vector(const std::string& path, int64_t offset, |
| int64_t size) { |
| TIcebergDeleteFileDesc delete_file; |
| delete_file.__set_content(3); |
| delete_file.__set_path(path); |
| delete_file.__set_content_offset(offset); |
| delete_file.__set_content_size_in_bytes(size); |
| return delete_file; |
| } |
| |
| TIcebergDeleteFileDesc make_iceberg_position_delete_file(const std::string& path) { |
| TIcebergDeleteFileDesc delete_file; |
| delete_file.__set_content(1); |
| delete_file.__set_path(path); |
| delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET); |
| return delete_file; |
| } |
| |
| TIcebergDeleteFileDesc make_iceberg_equality_delete_file(const std::string& path, |
| const std::vector<int32_t>& field_ids) { |
| TIcebergDeleteFileDesc delete_file; |
| delete_file.__set_content(2); |
| delete_file.__set_path(path); |
| delete_file.__set_field_ids(field_ids); |
| delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET); |
| return delete_file; |
| } |
| |
| TFileScanRangeParams make_local_parquet_scan_params() { |
| TFileScanRangeParams scan_params; |
| scan_params.__set_file_type(TFileType::FILE_LOCAL); |
| scan_params.__set_format_type(TFileFormatType::FORMAT_PARQUET); |
| return scan_params; |
| } |
| |
| std::shared_ptr<io::IOContext> make_io_context(io::FileReaderStats* file_reader_stats, |
| io::FileCacheStatistics* file_cache_stats) { |
| auto io_ctx = std::make_shared<io::IOContext>(); |
| io_ctx->file_reader_stats = file_reader_stats; |
| io_ctx->file_cache_stats = file_cache_stats; |
| return io_ctx; |
| } |
| |
| std::unique_ptr<ReadProfile> make_table_read_profile(RuntimeProfile* profile) { |
| auto read_profile = std::make_unique<ReadProfile>(); |
| read_profile->num_delete_files = ADD_COUNTER(profile, "NumDeleteFiles", TUnit::UNIT); |
| read_profile->num_delete_rows = ADD_COUNTER(profile, "NumDeleteRows", TUnit::UNIT); |
| read_profile->parse_delete_file_time = ADD_TIMER(profile, "ParseDeleteFileTime"); |
| return read_profile; |
| } |
| |
| TTableFormatFileDesc make_iceberg_table_format_desc( |
| const std::string& data_file_path, |
| const std::vector<TIcebergDeleteFileDesc>& delete_files) { |
| TTableFormatFileDesc table_format_params; |
| TIcebergFileDesc iceberg_params; |
| iceberg_params.__set_format_version(2); |
| iceberg_params.__set_original_file_path(data_file_path); |
| iceberg_params.__set_delete_files(delete_files); |
| table_format_params.__set_iceberg_params(iceberg_params); |
| return table_format_params; |
| } |
| |
| std::vector<int32_t> read_iceberg_ids(doris::iceberg::IcebergTableReader* reader, |
| const std::vector<TableColumn>& projected_columns) { |
| std::vector<int32_t> ids; |
| bool eos = false; |
| while (!eos) { |
| Block block = build_table_block(projected_columns); |
| auto status = reader->get_block(&block, &eos); |
| if (!status.ok()) { |
| ADD_FAILURE() << status; |
| return ids; |
| } |
| if (block.rows() == 0) { |
| continue; |
| } |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| for (size_t row = 0; row < block.rows(); ++row) { |
| ids.push_back(id_column.get_element(row)); |
| } |
| } |
| return ids; |
| } |
| |
| int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { |
| return column_metadata.has_dictionary_page() |
| ? static_cast<int64_t>(column_metadata.dictionary_page_offset()) |
| : static_cast<int64_t>(column_metadata.data_page_offset()); |
| } |
| |
| SplitReadOptions build_split_options_for_row_group_mid(const std::string& file_path, |
| int row_group_idx) { |
| auto options = build_split_options(file_path); |
| auto reader = ::parquet::ParquetFileReader::OpenFile(file_path, false); |
| auto metadata = reader->metadata(); |
| auto row_group_metadata = metadata->RowGroup(row_group_idx); |
| auto first_column = row_group_metadata->ColumnChunk(0); |
| auto last_column = row_group_metadata->ColumnChunk(row_group_metadata->num_columns() - 1); |
| const int64_t row_group_start_offset = parquet_column_start_offset(*first_column); |
| const int64_t row_group_end_offset = |
| parquet_column_start_offset(*last_column) + last_column->total_compressed_size(); |
| const int64_t row_group_mid_offset = |
| row_group_start_offset + (row_group_end_offset - row_group_start_offset) / 2; |
| options.current_range.__set_start_offset(row_group_mid_offset); |
| options.current_range.__set_size(1); |
| return options; |
| } |
| |
| TableColumn make_table_column(ColumnId id, const std::string& name, const DataTypePtr& type) { |
| TableColumn column; |
| column.id = id; |
| column.name = name; |
| column.type = type; |
| return column; |
| } |
| |
| SchemaField make_schema_field(ColumnId id, const std::string& name, const DataTypePtr& type) { |
| SchemaField field; |
| field.id = id; |
| field.name = name; |
| field.type = type; |
| return field; |
| } |
| |
| TEST(TableReaderTest, ReopenSplitAfterClose) { |
| const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const std::vector<std::string> file_paths = { |
| (test_dir / "split_1.parquet").string(), |
| (test_dir / "split_2.parquet").string(), |
| (test_dir / "split_3.parquet").string(), |
| }; |
| write_parquet_file(file_paths[0], 1, "one"); |
| write_parquet_file(file_paths[1], 2, "two"); |
| write_parquet_file(file_paths[2], 3, "three"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(1, "value", std::make_shared<DataTypeString>())); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| // Simulate the scanner lifecycle for three different splits: |
| // init() once, then repeat prepare_split() -> get_block() -> close(). |
| // This verifies TableReader::close() fully releases the previous low-level reader and task |
| // state, so a later prepare_split() can open and read a new split on the same TableReader. |
| // The table-level conjunct is also rebuilt for each split. The projection order puts value |
| // before id, so the pushed conjunct has to be rewritten to the ParquetReader file-local block |
| // position every time a new split is opened. |
| std::vector<int32_t> ids; |
| std::vector<std::string> values; |
| for (const auto& file_path : file_paths) { |
| auto split_options = build_split_options(file_path); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& value_column = |
| assert_cast<const ColumnString&>(*block.get_by_position(0).column); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(1).column); |
| ASSERT_EQ(id_column.size(), 1); |
| ASSERT_EQ(value_column.size(), 1); |
| ids.push_back(id_column.get_element(0)); |
| values.push_back(value_column.get_data_at(0).to_string()); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| } |
| |
| EXPECT_EQ(ids, std::vector<int32_t>({1, 2, 3})); |
| EXPECT_EQ(values, std::vector<std::string>({"one", "two", "three"})); |
| |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownCountFromNewParquetReader) { |
| const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_count_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, |
| {"one", "two", "three", "four", "five"}, 2); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 5); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxFromNewParquetReader) { |
| const auto test_dir = std::filesystem::temp_directory_path() / "doris_table_reader_minmax_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20}, |
| {"three", "one", "five", "two"}, 2); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| projected_columns.push_back(make_table_column(1, "score", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| const auto& score_column = assert_cast<const ColumnInt32&>(*block.get_by_position(1).column); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| EXPECT_EQ(id_column.get_element(1), 5); |
| EXPECT_EQ(score_column.get_element(0), 10); |
| EXPECT_EQ(score_column.get_element(1), 50); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxCastsFileValueToTableType) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_minmax_cast_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20}, |
| {"three", "one", "five", "two"}, 2); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt64>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& id_column = assert_cast<const ColumnInt64&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| EXPECT_EQ(id_column.get_element(1), 5); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxFromProjectedStructLeaf) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_minmax_struct_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_struct_parquet_file(file_path, {3, 1, 5, 2}, 2); |
| |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| auto id_child = make_table_column(0, "id", int_type); |
| auto struct_type = std::make_shared<DataTypeStruct>(DataTypes {int_type}, Strings {"id"}); |
| auto struct_column = make_table_column(100, "s", struct_type); |
| struct_column.children = {id_child}; |
| const std::vector<TableColumn> projected_columns = {struct_column}; |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& struct_result = assert_cast<const ColumnStruct&>(*block.get_by_position(0).column); |
| ASSERT_EQ(struct_result.get_columns().size(), 1); |
| const auto& ids = assert_cast<const ColumnInt32&>(struct_result.get_column(0)); |
| EXPECT_EQ(ids.get_element(0), 1); |
| EXPECT_EQ(ids.get_element(1), 5); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxFallsBackForProjectedListStructLeaf) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_minmax_list_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_list_struct_parquet_file(file_path); |
| |
| const auto string_type = std::make_shared<DataTypeString>(); |
| auto b_child = make_table_column(1, "b", string_type); |
| auto element_type = std::make_shared<DataTypeStruct>(DataTypes {string_type}, Strings {"b"}); |
| auto element_child = make_table_column(0, "element", element_type); |
| element_child.children = {b_child}; |
| auto list_column = make_table_column(100, "xs", std::make_shared<DataTypeArray>(element_type)); |
| list_column.children = {element_child}; |
| const std::vector<TableColumn> projected_columns = {list_column}; |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 3); |
| const auto& array_result = assert_cast<const ColumnArray&>(*block.get_by_position(0).column); |
| EXPECT_EQ(array_result.get_offsets()[0], 2); |
| EXPECT_EQ(array_result.get_offsets()[1], 3); |
| EXPECT_EQ(array_result.get_offsets()[2], 3); |
| const auto& element_struct = assert_cast<const ColumnStruct&>(array_result.get_data()); |
| ASSERT_EQ(element_struct.get_columns().size(), 1); |
| const auto& b_values = assert_cast<const ColumnString&>(element_struct.get_column(0)); |
| EXPECT_EQ(b_values.get_data_at(0).to_string(), "la"); |
| EXPECT_EQ(b_values.get_data_at(1).to_string(), "lb"); |
| EXPECT_EQ(b_values.get_data_at(2).to_string(), "lc"); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxFallsBackForProjectedMapValueStructLeaf) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_minmax_map_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_map_struct_parquet_file(file_path); |
| |
| const auto key_type = std::make_shared<DataTypeInt32>(); |
| const auto string_type = std::make_shared<DataTypeString>(); |
| auto b_child = make_table_column(1, "b", string_type); |
| auto value_type = std::make_shared<DataTypeStruct>(DataTypes {string_type}, Strings {"b"}); |
| auto value_child = make_table_column(1, "value", value_type); |
| value_child.children = {b_child}; |
| auto entry_type = std::make_shared<DataTypeStruct>(DataTypes {value_type}, Strings {"value"}); |
| auto entry_child = make_table_column(0, "entries", entry_type); |
| entry_child.children = {value_child}; |
| auto map_column = |
| make_table_column(100, "kv", std::make_shared<DataTypeMap>(key_type, value_type)); |
| map_column.children = {entry_child}; |
| const std::vector<TableColumn> projected_columns = {map_column}; |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 3); |
| const auto& map_result = assert_cast<const ColumnMap&>(*block.get_by_position(0).column); |
| EXPECT_EQ(map_result.get_offsets()[0], 2); |
| EXPECT_EQ(map_result.get_offsets()[1], 3); |
| EXPECT_EQ(map_result.get_offsets()[2], 3); |
| const auto& keys = assert_cast<const ColumnInt32&>(map_result.get_keys()); |
| EXPECT_EQ(keys.get_element(0), 1); |
| EXPECT_EQ(keys.get_element(1), 2); |
| EXPECT_EQ(keys.get_element(2), 3); |
| const auto& value_struct = assert_cast<const ColumnStruct&>(map_result.get_values()); |
| ASSERT_EQ(value_struct.get_columns().size(), 1); |
| const auto& b_values = assert_cast<const ColumnString&>(value_struct.get_column(0)); |
| EXPECT_EQ(b_values.get_data_at(0).to_string(), "ma"); |
| EXPECT_EQ(b_values.get_data_at(1).to_string(), "mb"); |
| EXPECT_EQ(b_values.get_data_at(2).to_string(), "mc"); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxOnlyUsesSelectedRowGroupInFileRange) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_minmax_range_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {10, 1, 100}, {100, 10, 1000}, {"ten", "one", "hundred"}, |
| 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path, 1)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| EXPECT_EQ(id_column.get_element(1), 1); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownCountOnlyUsesSelectedRowGroupInFileRange) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_count_range_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}, 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path, 2)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 1); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownCountFallsBackWithTableConjunct) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_count_conjunct_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 1); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownCountFallsBackWithColumnPredicate) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_count_predicate_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}, 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| TableColumnPredicates column_predicates; |
| column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>( |
| 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false)); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = std::move(column_predicates), |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 1); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, PushDownMinMaxFallsBackWithoutDirectFileMapping) { |
| const auto test_dir = std::filesystem::temp_directory_path() / |
| "doris_table_reader_minmax_missing_mapping_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 1, "one"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(99, "missing_id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::MINMAX, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 1); |
| EXPECT_EQ(block.get_by_position(0).column->get_int(0), 0); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_conjunct_filter_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 3, "three"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(1, "value", std::make_shared<DataTypeString>())); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| // open_reader() should convert the table-level conjunct on projected column id 0 into |
| // _table_filters before ColumnMapper creates the FileScanRequest. ColumnMapper then rewrites |
| // the conjunct's slot ref from table column id 0 to the file-local block position used by |
| // ParquetReader. The projection order intentionally puts value before id, so the id filter |
| // column is not at position 0 in the file block. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(1).column); |
| ASSERT_EQ(id_column.size(), 1); |
| EXPECT_EQ(id_column.get_element(0), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| |
| TableReader filtered_reader; |
| ASSERT_TRUE(filtered_reader |
| .init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 4))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| block = build_table_block(projected_columns); |
| eos = false; |
| ASSERT_TRUE(filtered_reader.get_block(&block, &eos).ok()); |
| EXPECT_TRUE(eos); |
| EXPECT_EQ(block.get_by_position(1).column->size(), 0); |
| |
| ASSERT_TRUE(filtered_reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, OpenReaderBuildsColumnPredicateFilters) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_column_predicate_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| // ColumnPredicate is only used for row-group/statistics pruning. Keep one row per row |
| // group so the predicate can prune the first two row groups and leave only id = 3. |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one", "two", "three"}, 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(2, "value", std::make_shared<DataTypeString>())); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| TableColumnPredicates column_predicates; |
| column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>( |
| 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false)); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = std::move(column_predicates), |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& value_column = assert_cast<const ColumnString&>(*block.get_by_position(0).column); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(1).column); |
| ASSERT_EQ(id_column.size(), 1); |
| ASSERT_EQ(value_column.size(), 1); |
| EXPECT_EQ(id_column.get_element(0), 3); |
| EXPECT_EQ(value_column.get_data_at(0).to_string(), "three"); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ColumnPredicateSurvivesReopenSplit) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_predicate_reopen_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const std::vector<std::string> file_paths = { |
| (test_dir / "split_1.parquet").string(), |
| (test_dir / "split_2.parquet").string(), |
| }; |
| write_int_pair_parquet_file(file_paths[0], {1, 3}, {10, 30}, {"one", "three"}, 1); |
| write_int_pair_parquet_file(file_paths[1], {2, 4}, {20, 40}, {"two", "four"}, 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| TableColumnPredicates column_predicates; |
| column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>( |
| 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false)); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = std::move(column_predicates), |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| std::vector<int32_t> ids; |
| for (const auto& file_path : file_paths) { |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| ASSERT_EQ(id_column.size(), 1); |
| ids.push_back(id_column.get_element(0)); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| } |
| |
| EXPECT_EQ(ids, std::vector<int32_t>({3, 4})); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, CreateScanRequestDeduplicatesSharedPredicateColumns) { |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const std::vector<TableColumn> projected_columns = { |
| make_table_column(0, "a", int_type), |
| make_table_column(1, "b", int_type), |
| make_table_column(2, "c", int_type), |
| make_table_column(3, "value", std::make_shared<DataTypeString>()), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| {.id = 0, .name = "a", .type = int_type, .children = {}, .column_type = DATA_COLUMN}, |
| {.id = 1, .name = "b", .type = int_type, .children = {}, .column_type = DATA_COLUMN}, |
| {.id = 2, .name = "c", .type = int_type, .children = {}, .column_type = DATA_COLUMN}, |
| {.id = 3, |
| .name = "value", |
| .type = std::make_shared<DataTypeString>(), |
| .children = {}, |
| .column_type = DATA_COLUMN}, |
| }; |
| |
| TableColumnMapper mapper; |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| std::vector<TableFilter> table_filters; |
| table_filters.push_back({ |
| .conjunct = VExprContext::create_shared( |
| std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 1)), |
| .slot_ids = {0, 1}, |
| }); |
| table_filters.push_back({ |
| .conjunct = VExprContext::create_shared( |
| std::make_shared<TableInt32SumLessThanExpr>(0, 0, 2, 2, 3)), |
| .slot_ids = {0, 2}, |
| }); |
| |
| FileScanRequest file_request; |
| ASSERT_TRUE( |
| mapper.create_scan_request(table_filters, {}, projected_columns, &file_request).ok()); |
| |
| // Both filters reference column a. It must still be read once as a predicate column, and a |
| // predicate column must not be repeated as a non-predicate column. |
| EXPECT_EQ(projection_ids(file_request.predicate_columns), std::vector<ColumnId>({0, 1, 2})); |
| EXPECT_EQ(projection_ids(file_request.non_predicate_columns), std::vector<ColumnId>({3})); |
| ASSERT_EQ(file_request.column_positions.size(), 4); |
| EXPECT_EQ(file_request.column_positions.at(3), 0); |
| EXPECT_EQ(file_request.column_positions.at(0), 1); |
| EXPECT_EQ(file_request.column_positions.at(1), 2); |
| EXPECT_EQ(file_request.column_positions.at(2), 3); |
| const auto predicate_column_ids = projection_ids(file_request.predicate_columns); |
| const auto non_predicate_column_ids = projection_ids(file_request.non_predicate_columns); |
| for (const auto predicate_column_id : predicate_column_ids) { |
| EXPECT_TRUE(std::find(non_predicate_column_ids.begin(), non_predicate_column_ids.end(), |
| predicate_column_id) == non_predicate_column_ids.end()); |
| } |
| } |
| |
| TEST(TableReaderTest, CreateScanRequestPromotesProjectedColumnToPredicateColumn) { |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const std::vector<TableColumn> projected_columns = { |
| make_table_column(0, "id", int_type), |
| make_table_column(1, "score", int_type), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| {.id = 0, .name = "id", .type = int_type, .children = {}, .column_type = DATA_COLUMN}, |
| {.id = 1, |
| .name = "score", |
| .type = int_type, |
| .children = {}, |
| .column_type = DATA_COLUMN}, |
| }; |
| |
| TableColumnMapper mapper; |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| TableFilter table_filter { |
| .conjunct = VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)), |
| .slot_ids = {0}, |
| }; |
| |
| FileScanRequest file_request; |
| ASSERT_TRUE( |
| mapper.create_scan_request({table_filter}, {}, projected_columns, &file_request).ok()); |
| |
| EXPECT_EQ(projection_ids(file_request.predicate_columns), std::vector<ColumnId>({0})); |
| EXPECT_EQ(projection_ids(file_request.non_predicate_columns), std::vector<ColumnId>({1})); |
| ASSERT_EQ(file_request.column_positions.size(), 2); |
| EXPECT_EQ(file_request.column_positions.at(0), 1); |
| EXPECT_EQ(file_request.column_positions.at(1), 0); |
| } |
| |
| TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_multi_conjunct_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one", "two", "three"}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(2, "value", std::make_shared<DataTypeString>())); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| projected_columns.push_back(make_table_column(1, "score", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE( |
| reader |
| .init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1, 1, 8))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| // The conjunct references both id and score, so ColumnMapper must put both file columns into |
| // predicate_columns and rewrite both slot refs to ParquetReader's file-local block positions. |
| // ParquetReader then evaluates the expression after all predicate columns have been read. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& value_column = assert_cast<const ColumnString&>(*block.get_by_position(0).column); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(1).column); |
| const auto& score_column = assert_cast<const ColumnInt32&>(*block.get_by_position(2).column); |
| ASSERT_EQ(id_column.size(), 1); |
| ASSERT_EQ(score_column.size(), 1); |
| ASSERT_EQ(value_column.size(), 1); |
| EXPECT_EQ(id_column.get_element(0), 3); |
| EXPECT_EQ(score_column.get_element(0), 8); |
| EXPECT_EQ(value_column.get_data_at(0).to_string(), "three"); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ProjectedColumnsFillDefaultForParquetSchemaMismatch) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_schema_mismatch_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 1, "one"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(99, "missing_value", std::make_shared<DataTypeString>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| // The table projection asks for field id 99, but the ParquetReader exposes only file-local |
| // fields 0 and 1. Missing columns are allowed by the current mapper options, so TableReader |
| // should still use the Parquet row count and fill a default column in table schema. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| EXPECT_EQ(block.get_by_position(0).column->size(), 1); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColumnsDisallowed) { |
| const auto test_dir = std::filesystem::temp_directory_path() / |
| "doris_table_reader_schema_mismatch_reject_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 1, "one"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(99, "missing_value", std::make_shared<DataTypeString>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = false, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| // With allow_missing_columns disabled, the same missing projected column should fail while |
| // opening the split instead of being materialized as a default column. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| const auto status = reader.get_block(&block, &eos); |
| ASSERT_FALSE(status.ok()); |
| EXPECT_NE(status.to_string().find("does not have a matching file column"), std::string::npos); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ProjectedStructFillsMissingChildWithDefault) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_struct_missing_child_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_struct_parquet_file(file_path, 7); |
| |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const auto string_type = std::make_shared<DataTypeString>(); |
| auto id_child = make_table_column(0, "id", int_type); |
| auto missing_child = make_table_column(99, "missing_child", string_type); |
| auto struct_type = std::make_shared<DataTypeStruct>(DataTypes {int_type, string_type}, |
| Strings {"id", "missing_child"}); |
| auto struct_column = make_table_column(100, "s", struct_type); |
| struct_column.children = {id_child, missing_child}; |
| const std::vector<TableColumn> projected_columns = {struct_column}; |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& struct_result = assert_cast<const ColumnStruct&>(*block.get_by_position(0).column); |
| ASSERT_EQ(struct_result.get_columns().size(), 2); |
| const auto& ids = assert_cast<const ColumnInt32&>(struct_result.get_column(0)); |
| const auto& missing_values = assert_cast<const ColumnString&>(struct_result.get_column(1)); |
| ASSERT_EQ(struct_result.size(), 1); |
| EXPECT_EQ(ids.get_element(0), 7); |
| EXPECT_EQ(missing_values.get_data_at(0).to_string(), ""); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ProjectedPartitionColumnUsesSplitPartitionValue) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_partition_value_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 1, "one"); |
| |
| std::vector<TableColumn> projected_columns; |
| auto partition_column = make_table_column(1, "value", std::make_shared<DataTypeString>()); |
| partition_column.is_partition_key = true; |
| projected_columns.push_back(std::move(partition_column)); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.partition_values.emplace("value", Field::create_field<TYPE_STRING>("p1")); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| // The file has a physical column with the same id/name. The split partition value should still |
| // take precedence and be materialized by TableReader. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& partition_value = |
| assert_cast<const ColumnString&>(*block.get_by_position(0).column); |
| ASSERT_EQ(partition_value.size(), 1); |
| EXPECT_EQ(partition_value.get_data_at(0).to_string(), "p1"); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergVirtualColumnsUseRowLineageMetadata) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_virtual_columns_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(100, "_row_id", make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back( |
| make_table_column(101, "_last_updated_sequence_number", |
| make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| set_iceberg_row_lineage_params(&split_options, 1000, 77); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(2).column); |
| |
| ASSERT_EQ(block.rows(), 2); |
| EXPECT_EQ(id_column.get_element(0), 2); |
| EXPECT_EQ(id_column.get_element(1), 3); |
| expect_nullable_int64_column_values(*block.get_by_position(0).column, {1001, 1002}); |
| expect_nullable_int64_column_values(*block.get_by_position(1).column, {77, 77}); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergVirtualColumnsKeepRowLineageAfterConjunctFiltering) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_virtual_columns_conjunct_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(100, "_row_id", make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back( |
| make_table_column(101, "_last_updated_sequence_number", |
| make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {VExprContext::create_shared( |
| std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1))}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| set_iceberg_row_lineage_params(&split_options, 3000, 88); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(2).column); |
| |
| ASSERT_EQ(block.rows(), 2); |
| EXPECT_EQ(id_column.get_element(0), 2); |
| EXPECT_EQ(id_column.get_element(1), 3); |
| expect_nullable_int64_column_values(*block.get_by_position(0).column, {3001, 3002}); |
| expect_nullable_int64_column_values(*block.get_by_position(1).column, {88, 88}); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergVirtualColumnsKeepRowLineageAfterRowGroupPredicatePruning) { |
| const auto test_dir = std::filesystem::temp_directory_path() / |
| "doris_iceberg_virtual_columns_row_group_predicate_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| // ColumnPredicate is used for row-group/statistics pruning. Keep one row per row group so |
| // id > 2 prunes the first two row groups and leaves only the third file-local row. |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}, 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(100, "_row_id", make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back( |
| make_table_column(101, "_last_updated_sequence_number", |
| make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| TableColumnPredicates column_predicates; |
| column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>( |
| 0, "id", std::make_shared<DataTypeInt32>(), Field::create_field<TYPE_INT>(2), false)); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = std::move(column_predicates), |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| set_iceberg_row_lineage_params(&split_options, 4000, 99); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(2).column); |
| |
| ASSERT_EQ(block.rows(), 1); |
| EXPECT_EQ(id_column.get_element(0), 3); |
| expect_nullable_int64_column_values(*block.get_by_position(0).column, {4002}); |
| expect_nullable_int64_column_values(*block.get_by_position(1).column, {99}); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergDeletionVectorUsesTableReaderDeleteFileInterface) { |
| TTableFormatFileDesc table_format_desc; |
| TIcebergFileDesc iceberg_desc; |
| iceberg_desc.__set_format_version(2); |
| iceberg_desc.__set_delete_files({make_iceberg_deletion_vector("dv.bin", 8, 128)}); |
| table_format_desc.__set_iceberg_params(iceberg_desc); |
| |
| IcebergTableReaderDeleteFileTestHelper reader; |
| DeleteFileDesc desc; |
| bool has_delete_file = false; |
| ASSERT_TRUE(reader.parse_deletion_vector_file(table_format_desc, &desc, &has_delete_file).ok()); |
| |
| EXPECT_TRUE(has_delete_file); |
| EXPECT_EQ(desc.path, "dv.bin"); |
| EXPECT_EQ(desc.start_offset, 8); |
| EXPECT_EQ(desc.size, 128); |
| EXPECT_EQ(desc.file_size, -1); |
| EXPECT_EQ(desc.format, DeleteFileDesc::Format::ICEBERG); |
| } |
| |
| TEST(TableReaderTest, IcebergDeletionVectorRejectsMultipleDeleteFiles) { |
| TTableFormatFileDesc table_format_desc; |
| TIcebergFileDesc iceberg_desc; |
| iceberg_desc.__set_format_version(2); |
| iceberg_desc.__set_delete_files({make_iceberg_deletion_vector("dv-a.bin", 8, 128), |
| make_iceberg_deletion_vector("dv-b.bin", 16, 256)}); |
| table_format_desc.__set_iceberg_params(iceberg_desc); |
| |
| IcebergTableReaderDeleteFileTestHelper reader; |
| DeleteFileDesc desc; |
| bool has_delete_file = false; |
| auto status = reader.parse_deletion_vector_file(table_format_desc, &desc, &has_delete_file); |
| |
| EXPECT_FALSE(status.ok()); |
| } |
| |
| TEST(TableReaderTest, IcebergTableReaderAppliesDeletionVectorFile) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_deletion_vector_file_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto dv_path = (test_dir / "delete-vector.bin").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, |
| {"one", "two", "three", "four", "five"}); |
| const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0, 4}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector<int32_t>({2, 3, 4})); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithDeletes) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_aggregate_delete_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto dv_path = (test_dir / "delete-vector.bin").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 2); |
| EXPECT_EQ(id_column.get_element(1), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithPositionDelete) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_aggregate_position_delete_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto delete_file_path = (test_dir / "position-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| write_position_delete_parquet_file(delete_file_path, {file_path}, {1}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_position_delete_file(delete_file_path)})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| EXPECT_EQ(id_column.get_element(1), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergPositionDeleteFallsBackToSplitPath) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_position_delete_path_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto delete_file_path = (test_dir / "position-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| write_position_delete_parquet_file(delete_file_path, {file_path}, {1}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| TTableFormatFileDesc table_format_params; |
| TIcebergFileDesc iceberg_params; |
| iceberg_params.__set_format_version(2); |
| iceberg_params.__set_delete_files({make_iceberg_position_delete_file(delete_file_path)}); |
| table_format_params.__set_iceberg_params(iceberg_params); |
| split_options.current_range.__set_table_format_params(table_format_params); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector<int32_t>({1, 3})); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithEqualityDelete) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_aggregate_equality_delete_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto delete_file_path = (test_dir / "equality-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| write_iceberg_equality_delete_parquet_file(delete_file_path, 0, 2); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .push_down_agg_type = TPushAggOp::type::COUNT, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_equality_delete_file(delete_file_path, {0})})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| EXPECT_EQ(id_column.get_element(1), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergEqualityDeleteCastsDataColumnToDeleteKeyType) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_equality_delete_cast_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto delete_file_path = (test_dir / "equality-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| write_iceberg_equality_delete_bigint_parquet_file(delete_file_path, 0, 2); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_equality_delete_file(delete_file_path, {0})})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector<int32_t>({1, 3})); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergPositionDeleteOnlyMatchesOriginalDataFilePath) { |
| const auto test_dir = std::filesystem::temp_directory_path() / |
| "doris_iceberg_position_delete_path_match_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto other_file_path = (test_dir / "other.parquet").string(); |
| const auto delete_file_path = (test_dir / "position-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| write_position_delete_parquet_file(delete_file_path, {other_file_path, file_path}, {0, 1}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_position_delete_file(delete_file_path)})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector<int32_t>({1, 3})); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergRowLineageRemainsFileLocalAfterDeleteFiltering) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_row_lineage_delete_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto delete_file_path = (test_dir / "position-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", "two", "three"}); |
| write_position_delete_parquet_file(delete_file_path, {file_path}, {1}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(100, "_row_id", make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| TTableFormatFileDesc table_format_params = make_iceberg_table_format_desc( |
| file_path, {make_iceberg_position_delete_file(delete_file_path)}); |
| table_format_params.iceberg_params.__set_first_row_id(1000); |
| split_options.current_range.__set_table_format_params(table_format_params); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| ASSERT_EQ(block.rows(), 2); |
| expect_nullable_int64_column_values(*block.get_by_position(0).column, {1000, 1002}); |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(1).column); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| EXPECT_EQ(id_column.get_element(1), 3); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_position_delete_file_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto delete_file_path = (test_dir / "position-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, |
| {"one", "two", "three", "four", "five"}); |
| write_position_delete_parquet_file(delete_file_path, {file_path, file_path}, {1, 3}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_position_delete_file(delete_file_path)})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector<int32_t>({1, 3, 5})); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, IcebergTableReaderMergesDeletionVectorAndPositionDeleteFiles) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_iceberg_delete_files_merge_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| const auto dv_path = (test_dir / "delete-vector.bin").string(); |
| const auto position_delete_path = (test_dir / "position-delete.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 50}, |
| {"one", "two", "three", "four", "five"}); |
| const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0}); |
| write_position_delete_parquet_file(position_delete_path, {file_path, file_path}, {3, 3}); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeProfile profile("test_profile"); |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| auto scan_params = make_local_parquet_scan_params(); |
| io::FileReaderStats file_reader_stats; |
| io::FileCacheStatistics file_cache_stats; |
| auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats); |
| ShardedKVCache cache(1); |
| doris::iceberg::IcebergTableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = &scan_params, |
| .io_ctx = io_ctx, |
| .runtime_state = &state, |
| .scanner_profile = &profile, |
| .allow_missing_columns = true, |
| .profile = make_table_read_profile(&profile), |
| }) |
| .ok()); |
| |
| auto split_options = build_split_options(file_path); |
| split_options.cache = &cache; |
| split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc( |
| file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size), |
| make_iceberg_position_delete_file(position_delete_path)})); |
| ASSERT_TRUE(reader.prepare_split(split_options).ok()); |
| |
| EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), std::vector<int32_t>({2, 3, 5})); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, RowPositionDeletePredicateColumnIsNotRepeatedAsOutputColumn) { |
| const auto row_position_column_id = |
| doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID; |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(100, "_row_id", make_nullable(std::make_shared<DataTypeInt64>()))); |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| |
| IcebergTableReaderScanRequestTestHelper reader; |
| ASSERT_TRUE(reader.init_for_scan_request_test(projected_columns).ok()); |
| |
| FileScanRequest request; |
| request.non_predicate_columns.push_back(field_projection(0)); |
| request.column_positions.emplace(0, 0); |
| |
| ASSERT_TRUE(reader.customize_request(&request).ok()); |
| |
| EXPECT_EQ(projection_ids(request.predicate_columns), |
| std::vector<ColumnId>({row_position_column_id})); |
| EXPECT_EQ(projection_ids(request.non_predicate_columns), std::vector<ColumnId>({0})); |
| ASSERT_TRUE(request.column_positions.contains(row_position_column_id)); |
| EXPECT_EQ(request.column_positions.at(row_position_column_id), 1); |
| ASSERT_TRUE(request.conjuncts.empty()); |
| ASSERT_EQ(request.delete_conjuncts.size(), 1); |
| EXPECT_NE(request.delete_conjuncts[0], nullptr); |
| } |
| |
| TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_file_range_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, |
| {"range_group_one", "range_group_two", "range_group_three"}, 1); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(0, "id", std::make_shared<DataTypeInt32>())); |
| projected_columns.push_back(make_table_column(2, "value", std::make_shared<DataTypeString>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options_for_row_group_mid(file_path, 1)).ok()); |
| |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| const auto& value_column = assert_cast<const ColumnString&>(*block.get_by_position(1).column); |
| ASSERT_EQ(block.rows(), 1); |
| EXPECT_EQ(id_column.get_element(0), 2); |
| EXPECT_EQ(value_column.get_data_at(0).to_string(), "range_group_two"); |
| |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| EXPECT_TRUE(eos); |
| EXPECT_EQ(block.rows(), 0); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_same_name_diff_id_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 1, "one"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back(make_table_column(99, "id", std::make_shared<DataTypeInt32>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| // The table column has the same name as the Parquet field, but a different field id. |
| // ColumnMapper should still resolve it by name and build a SlotRef projection from the file |
| // column into the requested table column. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| const auto& id_column = assert_cast<const ColumnInt32&>(*block.get_by_position(0).column); |
| ASSERT_EQ(id_column.size(), 1); |
| EXPECT_EQ(id_column.get_element(0), 1); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| TEST(TableReaderTest, ProjectedColumnsUseMapperExpressionsForParquetSchemaMismatch) { |
| const auto test_dir = |
| std::filesystem::temp_directory_path() / "doris_table_reader_mapper_expr_test"; |
| std::filesystem::remove_all(test_dir); |
| std::filesystem::create_directories(test_dir); |
| |
| const auto file_path = (test_dir / "split.parquet").string(); |
| write_parquet_file(file_path, 7, "seven"); |
| |
| std::vector<TableColumn> projected_columns; |
| projected_columns.push_back( |
| make_table_column(0, "table_id", std::make_shared<DataTypeInt64>())); |
| projected_columns.push_back( |
| make_table_column(1, "table_value", std::make_shared<DataTypeString>())); |
| |
| RuntimeState state {TQueryOptions(), TQueryGlobals()}; |
| TableReader reader; |
| ASSERT_TRUE(reader.init({ |
| .projected_columns = projected_columns, |
| .column_predicates = {}, |
| .conjuncts = {}, |
| .format = FileFormat::PARQUET, |
| .scan_params = nullptr, |
| .io_ctx = nullptr, |
| .runtime_state = &state, |
| .scanner_profile = nullptr, |
| .allow_missing_columns = true, |
| .profile = nullptr, |
| }) |
| .ok()); |
| |
| ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok()); |
| |
| // The table projection is intentionally different from the Parquet schema: |
| // field id 0 is requested as BIGINT instead of the file INT, so ColumnMapper should build a |
| // Cast expression; field id 1 has a different table name but the same type, so it should build |
| // a SlotRef projection. Both columns should still materialize in table schema order. |
| Block block = build_table_block(projected_columns); |
| bool eos = false; |
| ASSERT_TRUE(reader.get_block(&block, &eos).ok()); |
| ASSERT_FALSE(eos); |
| |
| ASSERT_EQ(block.get_by_position(0).name, "table_id"); |
| ASSERT_EQ(block.get_by_position(1).name, "table_value"); |
| const auto& id_column = assert_cast<const ColumnInt64&>(*block.get_by_position(0).column); |
| const auto& value_column = assert_cast<const ColumnString&>(*block.get_by_position(1).column); |
| ASSERT_EQ(id_column.size(), 1); |
| ASSERT_EQ(value_column.size(), 1); |
| EXPECT_EQ(id_column.get_element(0), 7); |
| EXPECT_EQ(value_column.get_data_at(0).to_string(), "seven"); |
| |
| ASSERT_TRUE(reader.close().ok()); |
| std::filesystem::remove_all(test_dir); |
| } |
| |
| // --------------------------------------------------------------------------- |
| // BY_INDEX (Hive1 / hive_*_use_column_names=false) column mapping tests. |
| // These cases exercise `TableColumnMapper::create_mapping` directly to verify top-level |
| // file-position matching semantics, where `TableColumn::id` is interpreted as the 0-based file |
| // column position in this mode. |
| // They do not depend on any real file reads. |
| // --------------------------------------------------------------------------- |
| |
| namespace { |
| |
| // In BY_INDEX mode, `TableColumn::id` directly represents the position of the column in |
| // `file_schema`. This helper packages `file_index + display name` into one TableColumn. |
| TableColumn make_index_table_column(int32_t file_index, const std::string& name, |
| const DataTypePtr& type) { |
| return make_table_column(file_index, name, type); |
| } |
| |
| } // namespace |
| |
| TEST(TableColumnMapperByIndexTest, MapsTopLevelColumnsByPositionIgnoringFileNames) { |
| // Simulate Hive1 ORC: all file schema names are placeholder values such as `_col0` / `_col1` |
| // / `_col2`, so table columns must match by position instead of name. The table projects all |
| // three file columns in order. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const auto str_type = std::make_shared<DataTypeString>(); |
| const std::vector<TableColumn> projected_columns = { |
| make_index_table_column(0, "user_id", int_type), |
| make_index_table_column(1, "user_name", str_type), |
| make_index_table_column(2, "age", int_type), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(0, "_col0", int_type), |
| make_schema_field(1, "_col1", str_type), |
| make_schema_field(2, "_col2", int_type), |
| }; |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| TableColumnMapper mapper(options); |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| const auto& mappings = mapper.mappings(); |
| ASSERT_EQ(mappings.size(), 3); |
| |
| ASSERT_TRUE(mappings[0].field_id.has_value()); |
| EXPECT_EQ(*mappings[0].field_id, 0); |
| EXPECT_EQ(mappings[0].file_column_name, "_col0"); |
| EXPECT_FALSE(mappings[0].is_constant); |
| |
| ASSERT_TRUE(mappings[1].field_id.has_value()); |
| EXPECT_EQ(*mappings[1].field_id, 1); |
| EXPECT_EQ(mappings[1].file_column_name, "_col1"); |
| |
| ASSERT_TRUE(mappings[2].field_id.has_value()); |
| EXPECT_EQ(*mappings[2].field_id, 2); |
| EXPECT_EQ(mappings[2].file_column_name, "_col2"); |
| } |
| |
| TEST(TableColumnMapperByIndexTest, SparseProjectionMapsByExplicitFileIndex) { |
| // Only project the 2nd and 4th table columns (mapped to `_col2` and `_col4` in the file). |
| // BY_INDEX must support sparse projection: file position is determined only by |
| // `table_column.id`, independent of the relative order inside `projected_columns`. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const std::vector<TableColumn> projected_columns = { |
| make_index_table_column(2, "age", int_type), |
| make_index_table_column(4, "score", int_type), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(0, "_col0", int_type), make_schema_field(1, "_col1", int_type), |
| make_schema_field(2, "_col2", int_type), make_schema_field(3, "_col3", int_type), |
| make_schema_field(4, "_col4", int_type), |
| }; |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| TableColumnMapper mapper(options); |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| const auto& mappings = mapper.mappings(); |
| ASSERT_EQ(mappings.size(), 2); |
| ASSERT_TRUE(mappings[0].field_id.has_value()); |
| EXPECT_EQ(*mappings[0].field_id, 2); |
| EXPECT_EQ(mappings[0].file_column_name, "_col2"); |
| |
| ASSERT_TRUE(mappings[1].field_id.has_value()); |
| EXPECT_EQ(*mappings[1].field_id, 4); |
| EXPECT_EQ(mappings[1].file_column_name, "_col4"); |
| } |
| |
| TEST(TableColumnMapperByIndexTest, PartitionColumnsTakeConstantAndDoNotConsumeFileIndex) { |
| // In BY_INDEX mode, partition columns should take the constant branch using |
| // `partition_values` and stay completely independent from file schema. Data columns still |
| // index into file positions through `table_column.id`. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const auto str_type = std::make_shared<DataTypeString>(); |
| |
| auto pt_col = make_table_column(/*id=*/-1, "dt", str_type); |
| pt_col.is_partition_key = true; |
| const std::vector<TableColumn> projected_columns = { |
| pt_col, // partition column first |
| make_index_table_column(0, "user_id", int_type), |
| make_index_table_column(1, "score", int_type), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(0, "_col0", int_type), |
| make_schema_field(1, "_col1", int_type), |
| }; |
| |
| std::map<std::string, Field> partition_values; |
| partition_values.emplace("dt", Field::create_field<TYPE_STRING>("2025-06-01")); |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| TableColumnMapper mapper(options); |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, partition_values, file_schema).ok()); |
| |
| const auto& mappings = mapper.mappings(); |
| ASSERT_EQ(mappings.size(), 3); |
| |
| EXPECT_TRUE(mappings[0].is_constant); |
| EXPECT_FALSE(mappings[0].field_id.has_value()); |
| EXPECT_NE(mappings[0].default_expr, nullptr); |
| |
| ASSERT_TRUE(mappings[1].field_id.has_value()); |
| EXPECT_EQ(*mappings[1].field_id, 0); |
| EXPECT_EQ(mappings[1].file_column_name, "_col0"); |
| |
| ASSERT_TRUE(mappings[2].field_id.has_value()); |
| EXPECT_EQ(*mappings[2].field_id, 1); |
| EXPECT_EQ(mappings[2].file_column_name, "_col1"); |
| } |
| |
| TEST(TableColumnMapperByIndexTest, FileIndexOutOfRangeFallsBackToDefaultOrMissing) { |
| // The declared file_index is outside file schema bounds. With `default_expr`, the mapping |
| // takes the constant branch. Without a default and with `allow_missing_columns=true`, it |
| // falls back to the missing-column branch without error and without a file mapping. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| |
| auto with_default = make_index_table_column(5, "extra_default", int_type); |
| auto literal_expr = VExprContext::create_shared( |
| TableLiteral::create_shared(int_type, Field::create_field<TYPE_INT>(42))); |
| with_default.default_expr = literal_expr; |
| |
| const std::vector<TableColumn> projected_columns = { |
| make_index_table_column(0, "a", int_type), |
| with_default, // out-of-range file_index + default |
| make_index_table_column(99, "extra_missing", int_type), // out-of-range without default |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(0, "_col0", int_type), |
| make_schema_field(1, "_col1", int_type), |
| }; |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| options.allow_missing_columns = true; |
| TableColumnMapper mapper(options); |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| const auto& mappings = mapper.mappings(); |
| ASSERT_EQ(mappings.size(), 3); |
| |
| ASSERT_TRUE(mappings[0].field_id.has_value()); |
| EXPECT_EQ(*mappings[0].field_id, 0); |
| |
| EXPECT_FALSE(mappings[1].field_id.has_value()); |
| EXPECT_TRUE(mappings[1].is_constant); |
| EXPECT_EQ(mappings[1].default_expr, literal_expr); |
| |
| EXPECT_FALSE(mappings[2].field_id.has_value()); |
| EXPECT_FALSE(mappings[2].is_constant); |
| EXPECT_EQ(mappings[2].default_expr, nullptr); |
| } |
| |
| TEST(TableColumnMapperByIndexTest, FileIndexOutOfRangeRejectedWhenAllowMissingFalse) { |
| // When allow_missing_columns=false, an out-of-range file_index without a default must fail. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const std::vector<TableColumn> projected_columns = { |
| make_index_table_column(0, "a", int_type), |
| make_index_table_column(5, "b", int_type), // out of range and no default |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(0, "_col0", int_type), |
| }; |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| options.allow_missing_columns = false; |
| TableColumnMapper mapper(options); |
| const auto status = mapper.create_mapping(projected_columns, {}, file_schema); |
| EXPECT_FALSE(status.ok()); |
| } |
| |
| TEST(TableColumnMapperByIndexTest, ExtraFileColumnsAreSimplyIgnored) { |
| // The file may contain more columns than the table projects. Any file column that is not |
| // referenced by a table column should simply be ignored without affecting the mapping. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const std::vector<TableColumn> projected_columns = { |
| make_index_table_column(0, "a", int_type), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(0, "_col0", int_type), |
| make_schema_field(1, "_col1", int_type), |
| make_schema_field(2, "_col2", int_type), |
| }; |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| TableColumnMapper mapper(options); |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| const auto& mappings = mapper.mappings(); |
| ASSERT_EQ(mappings.size(), 1); |
| ASSERT_TRUE(mappings[0].field_id.has_value()); |
| EXPECT_EQ(*mappings[0].field_id, 0); |
| } |
| |
| TEST(TableColumnMapperByIndexTest, IgnoresFileColumnNames) { |
| // BY_INDEX ignores file column names completely. Even if a file column name appears to match a |
| // table column name, the mapping must still follow the position specified by |
| // `table_column.id`. |
| const auto int_type = std::make_shared<DataTypeInt32>(); |
| const std::vector<TableColumn> projected_columns = { |
| // The table wants column "a", but file_index=1 means it should map to file column 1 |
| // (named "b"), not file column 0 that happens to be named "a". |
| make_index_table_column(1, "a", int_type), |
| }; |
| const std::vector<SchemaField> file_schema = { |
| make_schema_field(10, "a", int_type), |
| make_schema_field(20, "b", int_type), |
| }; |
| |
| TableColumnMapperOptions options; |
| options.mode = TableColumnMappingMode::BY_INDEX; |
| TableColumnMapper mapper(options); |
| ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, file_schema).ok()); |
| |
| const auto& mappings = mapper.mappings(); |
| ASSERT_EQ(mappings.size(), 1); |
| ASSERT_TRUE(mappings[0].field_id.has_value()); |
| EXPECT_EQ(*mappings[0].field_id, 20); |
| EXPECT_EQ(mappings[0].file_column_name, "b"); |
| } |
| |
| } // namespace |
| } // namespace doris::reader |