blob: 39afb15cd4d7fe1d99471c06417e05bf25241695 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "paimon/common/reader/data_evolution_file_reader.h"
#include <map>
#include "arrow/api.h"
#include "arrow/array/array_base.h"
#include "arrow/c/abi.h"
#include "arrow/c/bridge.h"
#include "arrow/ipc/api.h"
#include "arrow/util/range.h"
#include "gtest/gtest.h"
#include "paimon/common/utils/arrow/mem_utils.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/testing/mock/mock_file_batch_reader.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
namespace paimon::test {
class DataEvolutionFileReaderTest : public ::testing::Test,
public ::testing::WithParamInterface<bool> {
public:
void SetUp() override {
pool_ = GetDefaultPool();
}
void TearDown() override {
pool_.reset();
}
void CheckResult(const arrow::ArrayVector& src_array_vec,
const std::shared_ptr<arrow::Schema>& read_schema,
const std::vector<int32_t>& reader_offsets,
const std::vector<int32_t>& field_offsets,
const std::shared_ptr<arrow::Array>& expected_array,
const std::optional<RoaringBitmap32>& selection_bitmap = std::nullopt) const {
for (auto batch_size : arrow::internal::Iota(1, 10)) {
int32_t total_row_count = 0;
std::vector<std::unique_ptr<BatchReader>> readers;
for (const auto& array : src_array_vec) {
if (array == nullptr) {
// simulate no fields read from current reader
readers.push_back(nullptr);
continue;
}
total_row_count += array->length();
std::unique_ptr<MockFileBatchReader> file_batch_reader;
if (selection_bitmap) {
file_batch_reader = std::make_unique<MockFileBatchReader>(
array, array->type(), selection_bitmap.value(), batch_size);
} else {
file_batch_reader =
std::make_unique<MockFileBatchReader>(array, array->type(), batch_size);
}
auto enable_randomize_batch_size = GetParam();
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
readers.push_back(std::move(file_batch_reader));
}
ASSERT_OK_AND_ASSIGN(
auto data_evolution_file_reader,
DataEvolutionFileReader::Create(std::move(readers), read_schema, batch_size,
reader_offsets, field_offsets, pool_));
// check metrics, data_evolution_file_reader collects all row of each
// MockFileBatchReader
auto metrics = data_evolution_file_reader->GetReaderMetrics();
ASSERT_EQ(metrics->ToString(),
"{\"mock.number.of.rows\":" + std::to_string(total_row_count) + "}");
// check result array
ASSERT_OK_AND_ASSIGN(
auto result_array,
paimon::test::ReadResultCollector::CollectResult(data_evolution_file_reader.get()));
data_evolution_file_reader->Close();
auto expected_chunk_array = std::make_shared<arrow::ChunkedArray>(expected_array);
ASSERT_TRUE(result_array->Equals(expected_chunk_array));
}
}
void CheckNextBatchForSingleReader(int32_t inner_batch_size, int32_t read_batch_size,
const std::shared_ptr<arrow::Array>& src_array,
const std::optional<RoaringBitmap32>& selection_bitmap,
const std::shared_ptr<arrow::Array>& expected_array) const {
std::unique_ptr<MockFileBatchReader> file_batch_reader;
if (selection_bitmap) {
file_batch_reader = std::make_unique<MockFileBatchReader>(
src_array, src_array->type(), selection_bitmap.value(), inner_batch_size);
} else {
file_batch_reader = std::make_unique<MockFileBatchReader>(src_array, src_array->type(),
inner_batch_size);
}
auto enable_randomize_batch_size = GetParam();
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
std::vector<std::unique_ptr<BatchReader>> readers;
readers.push_back(std::move(file_batch_reader));
DataEvolutionFileReader fake_data_evolution_reader(
std::move(readers), /*read_schema=*/arrow::schema({}), read_batch_size,
/*reader_offsets=*/{}, /*field_offsets=*/{}, GetArrowPool(pool_));
arrow::ArrayVector result_array_vec;
while (true) {
ASSERT_OK_AND_ASSIGN(auto result_array,
fake_data_evolution_reader.NextBatchForSingleReader(0));
if (result_array == nullptr) {
break;
}
result_array_vec.push_back(result_array);
}
ASSERT_EQ(result_array_vec.size(),
std::ceil(static_cast<double>(expected_array->length()) / read_batch_size));
// except for last batch, the length each array is expected to be aligned to read_batch_size
for (size_t i = 0; i < result_array_vec.size() - 1; i++) {
ASSERT_EQ(result_array_vec[i]->length(), read_batch_size);
}
if (expected_array->length() % read_batch_size == 0) {
ASSERT_EQ(result_array_vec.back()->length(), read_batch_size);
} else {
ASSERT_EQ(result_array_vec.back()->length(),
expected_array->length() % read_batch_size);
}
auto result_chunk_array = std::make_shared<arrow::ChunkedArray>(result_array_vec);
auto expected_chunk_array = std::make_shared<arrow::ChunkedArray>(expected_array);
ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array));
}
private:
std::shared_ptr<MemoryPool> pool_;
};
TEST_F(DataEvolutionFileReaderTest, TestInvalid) {
{
arrow::FieldVector read_fields;
auto read_schema = arrow::schema(read_fields);
ASSERT_NOK_WITH_MSG(
DataEvolutionFileReader::Create({}, read_schema, /*read_batch_size=*/10, {}, {}, pool_),
"read schema must not be empty");
}
{
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()),
arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()),
arrow::field("f3", arrow::int32()),
};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 0, 1};
std::vector<int32_t> field_offsets = {0, 1, 0};
ASSERT_NOK_WITH_MSG(DataEvolutionFileReader::Create({}, read_schema, /*read_batch_size=*/10,
reader_offsets, field_offsets, pool_),
"read schema, row offsets and field offsets must have the same size");
}
{
std::vector<std::unique_ptr<BatchReader>> readers;
readers.push_back(nullptr);
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()),
arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()),
arrow::field("f3", arrow::int32()),
};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 0, 1, 1};
std::vector<int32_t> field_offsets = {0, 1, 1, 0};
ASSERT_NOK_WITH_MSG(
DataEvolutionFileReader::Create(std::move(readers), read_schema, /*read_batch_size=*/10,
reader_offsets, field_offsets, pool_),
"readers size is supposed to be more than 1");
}
}
TEST_P(DataEvolutionFileReaderTest, TestNextBatchForSingleReader) {
auto prepare_array = [](int64_t array_length) -> std::shared_ptr<arrow::Array> {
auto array_builder = std::make_shared<arrow::Int32Builder>();
for (int32_t i = 0; i < array_length; ++i) {
EXPECT_TRUE(array_builder->Append(i).ok());
}
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(array_builder->Finish(&array).ok());
return array;
};
auto prepare_array_with_bitmap =
[](const RoaringBitmap32& bitmap) -> std::shared_ptr<arrow::Array> {
auto array_builder = std::make_shared<arrow::Int32Builder>();
for (auto iter = bitmap.Begin(); iter != bitmap.End(); ++iter) {
EXPECT_TRUE(array_builder->Append(*iter).ok());
}
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(array_builder->Finish(&array).ok());
return array;
};
{
// src array length = 10, read batch size = 10
auto src_array = prepare_array(10);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 10)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/10, src_array,
/*selection_bitmap=*/std::nullopt,
/*expected_array=*/src_array);
}
}
{
// src array length = 10, read batch size = 6
auto src_array = prepare_array(10);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 6)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/6, src_array,
/*selection_bitmap=*/std::nullopt,
/*expected_array=*/src_array);
}
}
{
// src array length = 10, read batch size = 15
auto src_array = prepare_array(10);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/15, src_array,
/*selection_bitmap=*/std::nullopt,
/*expected_array=*/src_array);
}
}
{
// test bulk data, src array length = 10000, read batch size = 1024
auto src_array = prepare_array(10000);
for (int32_t inner_batch_size : {1, 2, 8, 16, 20, 100, 1024}) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/1024, src_array,
/*selection_bitmap=*/std::nullopt,
/*expected_array=*/src_array);
}
}
{
// src array length = 10, selection bitmap = {1, 3, 5}
auto src_array = prepare_array(10);
RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({1, 3, 5});
auto expected_array = prepare_array_with_bitmap(selected_bitmap);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/15, src_array,
selected_bitmap, expected_array);
}
}
{
// src array length = 10, selection all
auto src_array = prepare_array(10);
RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/15, src_array,
selected_bitmap, /*expected_array=*/src_array);
}
}
{
// src array length = 10, selection first
auto src_array = prepare_array(10);
RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0});
auto expected_array = prepare_array_with_bitmap(selected_bitmap);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/15, src_array,
selected_bitmap, expected_array);
}
}
{
// src array length = 10, selection first
auto src_array = prepare_array(10);
RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({9});
auto expected_array = prepare_array_with_bitmap(selected_bitmap);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/15, src_array,
selected_bitmap, expected_array);
}
}
{
// src array length = 10, selection consecutive positions
auto src_array = prepare_array(10);
RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({2, 3, 4, 5});
auto expected_array = prepare_array_with_bitmap(selected_bitmap);
for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/15, src_array,
selected_bitmap, expected_array);
}
}
{
auto src_array = prepare_array(10);
RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0, 1, 2, 3, 4, 5, 7, 8, 9});
auto expected_array = prepare_array_with_bitmap(selected_bitmap);
// inner batch: [0, 1, 2, 3] | [4, 5] [7, 8] | [9]
CheckNextBatchForSingleReader(/*inner_batch_size=*/4, /*read_batch_size=*/5, src_array,
selected_bitmap, expected_array);
}
{
// test bulk data, src array length = 10000, read batch size = 1024
auto src_array = prepare_array(10000);
RoaringBitmap32 selected_bitmap =
RoaringBitmap32::From({0, 10, 1000, 2333, 4566, 7838, 8787, 9999});
auto expected_array = prepare_array_with_bitmap(selected_bitmap);
for (int32_t inner_batch_size : {1, 2, 8, 16, 20, 100, 1024}) {
CheckNextBatchForSingleReader(inner_batch_size, /*read_batch_size=*/1024, src_array,
selected_bitmap, expected_array);
}
}
}
TEST_P(DataEvolutionFileReaderTest, TestSimple) {
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32()),
arrow::field("f4", arrow::utf8()), arrow::field("f5", arrow::int32()),
};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1};
std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[0], read_fields[2]}), R"([
[0, "00"],
[1, "01"],
[2, "02"],
[3, "03"],
[4, "04"],
[5, "05"]
])")
.ValueOrDie();
auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[5], read_fields[3]}), R"([
[10, 110],
[11, 111],
[12, 112],
[13, 113],
[14, 114],
[15, 115]
])")
.ValueOrDie();
auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[1], read_fields[4]}), R"([
[20, "20"],
[21, "21"],
[22, "22"],
[23, "23"],
[24, "24"],
[25, "25"]
])")
.ValueOrDie();
auto expected_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), R"([
[0, 20, "00", 110, "20", 10],
[1, 21, "01", 111, "21", 11],
[2, 22, "02", 112, "22", 12],
[3, 23, "03", 113, "23", 13],
[4, 24, "04", 114, "24", 14],
[5, 25, "05", 115, "25", 15]
])")
.ValueOrDie();
CheckResult({array0, array1, array2}, read_schema, reader_offsets, field_offsets,
expected_array);
}
TEST_P(DataEvolutionFileReaderTest, TestWithNonExistField) {
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32()),
arrow::field("f4", arrow::utf8()), arrow::field("f5", arrow::int32()),
arrow::field("non-field", arrow::int32()),
};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1, -1};
std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1};
auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[0], read_fields[2]}), R"([
[0, "00"],
[1, "01"],
[2, "02"],
[3, "03"],
[4, "04"],
[5, "05"]
])")
.ValueOrDie();
auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[5], read_fields[3]}), R"([
[10, 110],
[11, 111],
[12, 112],
[13, 113],
[14, 114],
[15, 115]
])")
.ValueOrDie();
auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[1], read_fields[4]}), R"([
[20, "20"],
[21, "21"],
[22, "22"],
[23, "23"],
[24, "24"],
[25, "25"]
])")
.ValueOrDie();
auto expected_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), R"([
[0, 20, "00", 110, "20", 10, null],
[1, 21, "01", 111, "21", 11, null],
[2, 22, "02", 112, "22", 12, null],
[3, 23, "03", 113, "23", 13, null],
[4, 24, "04", 114, "24", 14, null],
[5, 25, "05", 115, "25", 15, null]
])")
.ValueOrDie();
CheckResult({array0, array1, array2}, read_schema, reader_offsets, field_offsets,
expected_array);
}
TEST_P(DataEvolutionFileReaderTest, TestReadFromPartialReaders) {
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32()),
arrow::field("f4", arrow::utf8()), arrow::field("f5", arrow::int32()),
};
auto read_schema = arrow::schema(read_fields);
// simulate reader2 has no field to read
std::vector<int32_t> reader_offsets = {0, 3, 0, 1, 3, 1};
std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[0], read_fields[2]}), R"([
[0, "00"],
[1, "01"],
[2, "02"],
[3, "03"],
[4, "04"],
[5, "05"]
])")
.ValueOrDie();
auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[5], read_fields[3]}), R"([
[10, 110],
[11, 111],
[12, 112],
[13, 113],
[14, 114],
[15, 115]
])")
.ValueOrDie();
auto array3 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[1], read_fields[4]}), R"([
[20, "20"],
[21, "21"],
[22, "22"],
[23, "23"],
[24, "24"],
[25, "25"]
])")
.ValueOrDie();
auto expected_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), R"([
[0, 20, "00", 110, "20", 10],
[1, 21, "01", 111, "21", 11],
[2, 22, "02", 112, "22", 12],
[3, 23, "03", 113, "23", 13],
[4, 24, "04", 114, "24", 14],
[5, 25, "05", 115, "25", 15]
])")
.ValueOrDie();
CheckResult({array0, array1, nullptr, array3}, read_schema, reader_offsets, field_offsets,
expected_array);
}
TEST_P(DataEvolutionFileReaderTest, TestNestedType) {
arrow::FieldVector read_fields = {
arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())),
arrow::field("f2", arrow::list(arrow::float32())),
arrow::field("f3", arrow::struct_({arrow::field("f0", arrow::boolean()),
arrow::field("f1", arrow::int64())})),
arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
arrow::field("f5", arrow::date32()),
arrow::field("f6", arrow::decimal128(2, 2))};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 1, 0, 1, 0, 1};
std::vector<int32_t> field_offsets = {2, 0, 1, 1, 0, 2};
auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[4], read_fields[2], read_fields[0]}), R"([
[2456, [true, 2], [[0, 0]]],
[24, [true, 1], [[0, 1]]],
[2456, [false, 12], [[10, 10]]],
[245, [false, 2222], [[127, 32767], [-128, -32768]]],
[24, [true, 2], [[1, 64], [2, 32]]],
[24, [true, 2], [[11, 64], [12, 32]]]
])")
.ValueOrDie();
auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[1], read_fields[3], read_fields[5]}), R"([
[[0.1, 0.2], "1970-01-01 00:02:03.123123", "0.22"],
[[0.1, 0.3], "1970-01-01 00:02:03.999999", "0.28"],
[[1.1, 1.2], "1970-01-01 00:02:03.123123", "0.22"],
[[1.1, 1.2], "1970-01-01 00:02:03.123123", "0.12"],
[[2.2, 3.2], "1970-01-01 00:00:00.0", "0.78"],
[[2.2, 3.2], "1970-01-01 00:00:00.123123", "0.78"]
])")
.ValueOrDie();
auto expected_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), R"([
[[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123123", 2456, "0.22"],
[[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999999", 24, "0.28"],
[[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123123", 2456, "0.22"],
[[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], "1970-01-01 00:02:03.123123", 245, "0.12"],
[[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 24, "0.78"],
[[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.123123", 24, "0.78"]
])")
.ValueOrDie();
CheckResult({array0, array1}, read_schema, reader_offsets, field_offsets, expected_array);
}
TEST_P(DataEvolutionFileReaderTest, TestWithBitmap) {
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32()),
arrow::field("f4", arrow::utf8()), arrow::field("f5", arrow::int32()),
arrow::field("non-exist", arrow::int32())};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1, -1};
std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1};
RoaringBitmap32 selection_bitmap = RoaringBitmap32::From({1, 3, 5});
auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[0], read_fields[2]}), R"([
[0, "00"],
[1, "01"],
[2, "02"],
[3, "03"],
[4, "04"],
[5, "05"]
])")
.ValueOrDie();
auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[5], read_fields[3]}), R"([
[10, 110],
[11, 111],
[12, 112],
[13, 113],
[14, 114],
[15, 115]
])")
.ValueOrDie();
auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[1], read_fields[4]}), R"([
[20, "20"],
[21, "21"],
[22, "22"],
[23, "23"],
[24, "24"],
[25, "25"]
])")
.ValueOrDie();
auto expected_array =
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), R"([
[1, 21, "01", 111, "21", 11, null],
[3, 23, "03", 113, "23", 13, null],
[5, 25, "05", 115, "25", 15, null]
])")
.ValueOrDie();
CheckResult({array0, array1, array2}, read_schema, reader_offsets, field_offsets,
expected_array, selection_bitmap);
}
TEST_P(DataEvolutionFileReaderTest, TestSingleReaderRowCountMismatch) {
arrow::FieldVector read_fields = {
arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32())};
auto read_schema = arrow::schema(read_fields);
std::vector<int32_t> reader_offsets = {0, 1, 0, 1};
std::vector<int32_t> field_offsets = {0, 0, 1, 1};
auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[0], read_fields[2]}), R"([
[0, "00"],
[1, "01"],
[2, "02"],
[3, "03"],
[4, "04"],
[5, "05"]
])")
.ValueOrDie();
auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({read_fields[1], read_fields[3]}), R"([
[10, 110],
[11, 111],
[12, 112],
[13, 113],
[14, 114]
])")
.ValueOrDie();
std::vector<std::unique_ptr<BatchReader>> readers;
for (const auto& array : {array0, array1}) {
auto file_batch_reader =
std::make_unique<MockFileBatchReader>(array, array->type(), /*read_batch_size=*/10);
auto enable_randomize_batch_size = GetParam();
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
readers.push_back(std::move(file_batch_reader));
}
ASSERT_OK_AND_ASSIGN(
auto data_evolution_file_reader,
DataEvolutionFileReader::Create(std::move(readers), read_schema, /*read_batch_size=*/10,
reader_offsets, field_offsets, pool_));
// array0 has 6 rows but array1 only has 5 rows
ASSERT_NOK_WITH_MSG(
paimon::test::ReadResultCollector::CollectResult(data_evolution_file_reader.get()),
"array for single reader length mismatch others");
}
INSTANTIATE_TEST_SUITE_P(EnableRandomizeBatchSize, DataEvolutionFileReaderTest,
::testing::ValuesIn({true, false}));
} // namespace paimon::test