blob: e9832c194373e3989161c6833db08330d457d267 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
#include <set>
#include "arrow/compute/api.h"
#include "arrow/ipc/api.h"
#include "gtest/gtest.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/executor.h"
#include "paimon/format/file_format.h"
#include "paimon/format/file_format_factory.h"
#include "paimon/format/format_writer.h"
#include "paimon/fs/file_system_factory.h"
#include "paimon/fs/local/local_file_system.h"
#include "paimon/predicate/predicate_builder.h"
#include "paimon/testing/mock/mock_file_batch_reader.h"
#include "paimon/testing/mock/mock_file_system.h"
#include "paimon/testing/mock/mock_format_reader_builder.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
#include "paimon/utils/read_ahead_cache.h"
namespace paimon::test {
class ControlledMockFileBatchReader : public MockFileBatchReader {
public:
ControlledMockFileBatchReader(const std::shared_ptr<arrow::Array>& data,
const std::shared_ptr<arrow::DataType>& file_schema,
int32_t read_batch_size,
std::vector<std::pair<uint64_t, uint64_t>> read_ranges,
bool need_prefetch, Status set_read_ranges_status = Status::OK())
: MockFileBatchReader(data, file_schema, read_batch_size),
read_ranges_(std::move(read_ranges)),
need_prefetch_(need_prefetch),
set_read_ranges_status_(std::move(set_read_ranges_status)) {}
Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
bool* need_prefetch) const override {
*need_prefetch = need_prefetch_;
return read_ranges_;
}
Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override {
if (!set_read_ranges_status_.ok()) {
return set_read_ranges_status_;
}
return MockFileBatchReader::SetReadRanges(read_ranges);
}
private:
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
bool need_prefetch_ = true;
Status set_read_ranges_status_;
};
class ControlledMockFormatReaderBuilder : public ReaderBuilder {
public:
ControlledMockFormatReaderBuilder(const std::shared_ptr<arrow::Array>& data,
const std::shared_ptr<arrow::DataType>& schema,
int32_t read_batch_size,
std::vector<std::pair<uint64_t, uint64_t>> read_ranges,
bool need_prefetch,
std::vector<Status> set_read_ranges_statuses)
: data_(data),
schema_(schema),
read_batch_size_(read_batch_size),
read_ranges_(std::move(read_ranges)),
need_prefetch_(need_prefetch),
set_read_ranges_statuses_(std::move(set_read_ranges_statuses)) {}
ReaderBuilder* WithMemoryPool(const std::shared_ptr<MemoryPool>& pool) override {
return this;
}
Result<std::unique_ptr<FileBatchReader>> Build(
const std::shared_ptr<InputStream>& path) const override {
size_t index = build_count_++;
Status set_read_ranges_status = index < set_read_ranges_statuses_.size()
? set_read_ranges_statuses_[index]
: Status::OK();
return std::make_unique<ControlledMockFileBatchReader>(
data_, schema_, read_batch_size_, read_ranges_, need_prefetch_, set_read_ranges_status);
}
Result<std::unique_ptr<FileBatchReader>> Build(const std::string& path) const override {
return Status::Invalid("do not support build reader with path in mock format");
}
private:
std::shared_ptr<arrow::Array> data_;
std::shared_ptr<arrow::DataType> schema_;
int32_t read_batch_size_ = 0;
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
bool need_prefetch_ = true;
std::vector<Status> set_read_ranges_statuses_;
mutable size_t build_count_ = 0;
};
struct TestParam {
std::string file_format;
PrefetchCacheMode cache_mode;
};
class PrefetchFileBatchReaderImplTest : public ::testing::Test,
public ::testing::WithParamInterface<TestParam> {
public:
void SetUp() override {
fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int64()),
arrow::field("f2", arrow::boolean())};
data_type_ = arrow::struct_(fields_);
mock_fs_ = std::make_shared<MockFileSystem>();
local_fs_ = std::make_shared<LocalFileSystem>();
executor_ = CreateDefaultExecutor(/*thread_count=*/2);
dir_ = ::paimon::test::UniqueTestDirectory::Create();
ASSERT_TRUE(dir_);
}
void TearDown() override {}
std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset = 0) {
arrow::StructBuilder struct_builder(
data_type_, arrow::default_memory_pool(),
{std::make_shared<arrow::StringBuilder>(), std::make_shared<arrow::Int64Builder>(),
std::make_shared<arrow::BooleanBuilder>()});
auto string_builder = static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
auto big_int_builder = static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
auto bool_builder = static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
for (int32_t i = 0 + offset; i < length + offset; ++i) {
EXPECT_TRUE(struct_builder.Append().ok());
EXPECT_TRUE(string_builder->Append("str_" + std::to_string(i)).ok());
EXPECT_TRUE(big_int_builder->Append(i).ok());
EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
}
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(struct_builder.Finish(&array).ok());
return array;
}
void PrepareTestData(const std::string& file_format_str,
const std::shared_ptr<arrow::Array>& array, int32_t stripe_row_count,
int32_t row_index_stride) const {
// for simple case, assume that array.length() % row_index_stride == 0
ASSERT_EQ(array->length() % row_index_stride, 0);
arrow::Schema schema(array->type()->fields());
::ArrowSchema c_schema;
ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok());
ASSERT_OK_AND_ASSIGN(
std::unique_ptr<FileFormat> file_format,
FileFormatFactory::Get(
file_format_str,
{{"parquet.write.max-row-group-length", std::to_string(row_index_stride)},
{"orc.row.index.stride", std::to_string(row_index_stride)}}));
ASSERT_OK_AND_ASSIGN(auto writer_builder,
file_format->CreateWriterBuilder(&c_schema, 1024));
ASSERT_OK_AND_ASSIGN(
std::shared_ptr<OutputStream> out,
local_fs_->Create(PathUtil::JoinPath(dir_->Str(), "file." + file_format_str),
/*overwrite=*/false));
ASSERT_OK_AND_ASSIGN(auto writer, writer_builder->Build(out, "zstd"));
int32_t write_batch_count = array->length() / row_index_stride;
for (int32_t i = 0; i < write_batch_count; i++) {
auto slice = array->Slice(i * row_index_stride, row_index_stride);
auto copied_array = arrow::Concatenate({slice}).ValueOr(nullptr);
ASSERT_TRUE(copied_array);
::ArrowArray c_array;
ASSERT_TRUE(arrow::ExportArray(*copied_array, &c_array).ok());
ASSERT_OK(writer->AddBatch(&c_array));
}
ASSERT_OK(writer->Flush());
ASSERT_OK(writer->Finish());
ASSERT_OK(out->Flush());
ASSERT_OK(out->Close());
}
std::unique_ptr<PrefetchFileBatchReaderImpl> PreparePrefetchReader(
const std::string& file_format_str, const arrow::Schema* read_schema,
const std::shared_ptr<Predicate>& predicate,
const std::optional<RoaringBitmap32>& selection_bitmap, int32_t batch_size,
int32_t prefetch_max_parallel_num, PrefetchCacheMode cache_mode) const {
EXPECT_OK_AND_ASSIGN(std::unique_ptr<FileFormat> file_format,
FileFormatFactory::Get(file_format_str, {}));
EXPECT_OK_AND_ASSIGN(auto reader_builder, file_format->CreateReaderBuilder(batch_size));
EXPECT_OK_AND_ASSIGN(
std::unique_ptr<PrefetchFileBatchReaderImpl> reader,
PrefetchFileBatchReaderImpl::Create(
PathUtil::JoinPath(dir_->Str(), "file." + file_format->Identifier()),
reader_builder.get(), local_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false,
CreateDefaultExecutor(prefetch_max_parallel_num - 1),
/*initialize_read_ranges=*/false, cache_mode, CacheConfig(), GetDefaultPool()));
std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
EXPECT_TRUE(arrow_status.ok());
EXPECT_OK(reader->SetReadSchema(c_schema.get(), predicate, selection_bitmap));
return reader;
}
bool HasValue(const std::vector<
std::unique_ptr<ThreadsafeQueue<PrefetchFileBatchReaderImpl::PrefetchBatch>>>&
prefetch_queues) {
for (const auto& queue : prefetch_queues) {
if (!queue->empty()) {
return true;
}
}
return false;
}
bool CheckEqual(const std::shared_ptr<arrow::ChunkedArray>& lhs,
const std::shared_ptr<arrow::ChunkedArray>& rhs) {
std::string lhs_str, rhs_str;
auto print_option = arrow::PrettyPrintOptions::Defaults();
print_option.window = 1000;
print_option.container_window = 1000;
EXPECT_TRUE(arrow::PrettyPrint(*lhs, print_option, &lhs_str).ok());
EXPECT_TRUE(arrow::PrettyPrint(*rhs, print_option, &rhs_str).ok());
bool is_equal = lhs->Equals(rhs);
if (!is_equal) {
std::cout << "lhs array: " << lhs_str << ", rhs array: " << rhs_str;
}
return is_equal;
}
private:
arrow::FieldVector fields_;
std::shared_ptr<arrow::DataType> data_type_;
std::shared_ptr<FileSystem> mock_fs_;
std::shared_ptr<FileSystem> local_fs_;
std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
std::shared_ptr<Executor> executor_;
};
std::vector<TestParam> PrepareTestParam() {
std::vector<TestParam> values = {
TestParam{"parquet", PrefetchCacheMode::ALWAYS},
TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP},
TestParam{"parquet", PrefetchCacheMode::EXCLUDE_PREDICATE},
TestParam{"parquet", PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE},
TestParam{"parquet", PrefetchCacheMode::NEVER}};
#ifdef PAIMON_ENABLE_ORC
values.emplace_back(TestParam{"orc", PrefetchCacheMode::ALWAYS});
values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_BITMAP});
values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_PREDICATE});
values.emplace_back(TestParam{"orc", PrefetchCacheMode::EXCLUDE_BITMAP_OR_PREDICATE});
values.emplace_back(TestParam{"orc", PrefetchCacheMode::NEVER});
#endif
return values;
}
INSTANTIATE_TEST_SUITE_P(TestParam, PrefetchFileBatchReaderImplTest,
::testing::ValuesIn(PrepareTestParam()));
TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) {
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num,
batch_size, prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
ASSERT_TRUE(result_array->Equals(expected_array));
}
}
TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 12;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
// simulate read limits, only read 8 batches
for (int32_t i = 0; i < 8; i++) {
ASSERT_OK_AND_ASSIGN(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
reader->NextBatchWithBitmap());
auto& [batch, bitmap] = batch_with_bitmap;
ASSERT_EQ(batch.first->length, bitmap.Cardinality());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> array,
ReadResultCollector::GetArray(std::move(batch)));
ASSERT_TRUE(array);
}
reader->Close();
// test metrics
auto read_metrics = reader->GetReaderMetrics();
ASSERT_TRUE(read_metrics);
}
TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 12;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
// simulate read limits, only read 8 batches
ASSERT_NOK_WITH_MSG(reader->NextBatchWithBitmap(),
"prefetch reader read ranges are not initialized");
reader->Close();
}
TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) {
std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
{0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
{5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt);
ASSERT_EQ(filtered_ranges, read_ranges);
}
TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) {
std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
{0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
{5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
auto bitmap = RoaringBitmap32::From({});
auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
ASSERT_TRUE(filtered_ranges.empty());
}
TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) {
auto data_array = PrepareArray(10000);
std::set<int32_t> valid_row_ids;
for (int32_t i = 1000; i < 2000; i++) {
valid_row_ids.insert(i);
}
for (int32_t i = 3000; i < 6500; i++) {
valid_row_ids.insert(i);
}
std::vector<int32_t> bitmap_data(valid_row_ids.begin(), valid_row_ids.end());
auto bitmap = RoaringBitmap32::From(bitmap_data);
std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
{0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
{5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}};
auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap);
std::vector<std::pair<uint64_t, uint64_t>> expected_filtered_ranges = {
{1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}};
ASSERT_EQ(expected_filtered_ranges, filtered_ranges);
}
TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) {
std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
auto read_ranges_in_group = PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
ASSERT_EQ(read_ranges_in_group.size(), 3);
ASSERT_TRUE(read_ranges_in_group[0].empty());
ASSERT_TRUE(read_ranges_in_group[1].empty());
ASSERT_TRUE(read_ranges_in_group[2].empty());
}
TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) {
std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
{0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}};
auto read_ranges_in_group = PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3);
std::vector<std::pair<uint64_t, uint64_t>> expected_group_0 = {{0, 10000}, {30000, 40000}};
ASSERT_EQ(read_ranges_in_group[0], expected_group_0);
std::vector<std::pair<uint64_t, uint64_t>> expected_group_1 = {{10000, 20000}};
ASSERT_EQ(read_ranges_in_group[1], expected_group_1);
std::vector<std::pair<uint64_t, uint64_t>> expected_group_2 = {{20000, 30000}};
ASSERT_EQ(read_ranges_in_group[2], expected_group_2);
}
TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) {
auto data_array = PrepareArray(101);
int32_t batch_size = 30;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
ASSERT_OK(prefetch_reader->RefreshReadRanges());
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 30}, {90, 101}};
auto mock_reader_0 = dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{30, 60}};
auto mock_reader_1 = dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{60, 90}};
auto mock_reader_2 = dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
}
TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRangesDisablePrefetchByAdaptiveStrategy) {
auto data_array = PrepareArray(200);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 1;
ControlledMockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size,
/*read_ranges=*/{{0, 100}},
/*need_prefetch=*/true,
/*set_read_ranges_statuses=*/{});
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
/*prefetch_batch_count=*/2,
/*enable_adaptive_prefetch_strategy=*/true, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_FALSE(reader->NeedPrefetch());
}
TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) {
auto data_array = PrepareArray(400);
int32_t batch_size = 30;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
ASSERT_FALSE(prefetch_reader->need_prefetch_);
prefetch_reader->need_prefetch_ = true;
std::vector<std::pair<uint64_t, uint64_t>> ranges = {
{0, 100}, {100, 200}, {200, 300}, {300, 400}};
ASSERT_OK(prefetch_reader->SetReadRanges(ranges));
auto& read_ranges_queue = prefetch_reader->read_ranges_;
std::vector<std::pair<uint64_t, uint64_t>> read_ranges;
for (auto& iter : read_ranges_queue) {
read_ranges.push_back(iter);
}
ranges.emplace_back(400, 401);
ASSERT_EQ(read_ranges, ranges);
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_0 = {{0, 100}, {300, 400}};
auto mock_reader_0 = dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[0].get());
ASSERT_EQ(mock_reader_0->GetReadRanges(), read_ranges_0);
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_1 = {{100, 200}};
auto mock_reader_1 = dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get());
ASSERT_EQ(mock_reader_1->GetReadRanges(), read_ranges_1);
std::vector<std::pair<uint64_t, uint64_t>> read_ranges_2 = {{200, 300}};
auto mock_reader_2 = dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[2].get());
ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2);
}
TEST_F(PrefetchFileBatchReaderImplTest, SetReadRangesReturnErrorWhenPushDownFailed) {
auto data_array = PrepareArray(100);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 2;
ControlledMockFormatReaderBuilder reader_builder(
data_array, data_type_, batch_size,
/*read_ranges=*/{{0, 50}, {50, 100}},
/*need_prefetch=*/true,
/*set_read_ranges_statuses=*/{Status::OK(), Status::IOError("set read ranges failed")});
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->need_prefetch_ = true;
Status status = prefetch_reader->SetReadRanges({{0, 50}, {50, 100}});
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.IsIOError());
}
TEST_F(PrefetchFileBatchReaderImplTest, NeedInitCacheNeverMode) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::NEVER,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
ASSERT_FALSE(prefetch_reader->NeedInitCache());
}
TEST_F(PrefetchFileBatchReaderImplTest, WorkloopSetReadStatusWhenCacheInitFailed) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
CacheConfig invalid_cache_config(
/*buffer_size_limit=*/512 * 1024,
/*range_size_limit=*/static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1,
/*hole_size_limit=*/8 * 1024,
/*pre_buffer_limit=*/128 * 1024);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
invalid_cache_config, GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->Workloop();
Status status = prefetch_reader->GetReadStatus();
ASSERT_FALSE(status.ok());
ASSERT_TRUE(status.IsInvalid());
}
TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenShutdown) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->is_shutdown_ = true;
ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
}
TEST_F(PrefetchFileBatchReaderImplTest, DoReadBatchReturnOkWhenNoCurrentReadRange) {
auto data_array = PrepareArray(10);
int32_t batch_size = 5;
int32_t prefetch_max_parallel_num = 1;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/false, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
prefetch_reader->read_ranges_in_group_ = {{}};
ASSERT_OK(prefetch_reader->DoReadBatch(/*reader_idx=*/0));
}
TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) {
auto data_array = PrepareArray(101);
int32_t batch_size = 150;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101);
auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
ASSERT_TRUE(result_array->Equals(expected_array));
}
TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
->EnableRandomizeBatchSize(false);
}
arrow::ArrayVector result_array_vector;
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap());
auto& [batch, bitmap] = batch_with_bitmap;
ASSERT_EQ(batch.first->length, bitmap.Cardinality());
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
ASSERT_OK_AND_ASSIGN(auto array, ReadResultCollector::GetArray(std::move(batch)));
result_array_vector.push_back(array);
ASSERT_OK(prefetch_reader->GetReadStatus());
usleep(100000); // sleep 100ms to ensure that the other data has been pushed
ASSERT_TRUE(HasValue(prefetch_reader->prefetch_queues_));
// Set IOError for reader[1] after the first NextBatch().
// Now the data in prefetch_queues_[0] is [30,39], prefetch_queues_[1] is [10,19],
// prefetch_queues_[2] is [20,29],
// So, the IOError will occur at [40,49].
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[1].get())
->SetNextBatchStatus(Status::IOError("mock error"));
usleep(100000);
// pop [10,19]
ASSERT_OK_AND_ASSIGN(batch_with_bitmap, reader->NextBatchWithBitmap());
// now reader1 fetch [40,49] and set error status.
usleep(100000);
ASSERT_NOK(reader->NextBatchWithBitmap());
ReaderUtils::ReleaseReadBatch(std::move(batch_with_bitmap.first));
}
TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
auto prefetch_reader = dynamic_cast<PrefetchFileBatchReaderImpl*>(reader.get());
for (int32_t i = 0; i < prefetch_max_parallel_num; i++) {
dynamic_cast<MockFileBatchReader*>(prefetch_reader->readers_[i].get())
->SetNextBatchStatus(Status::IOError("mock error"));
}
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
auto batch_result = reader->NextBatchWithBitmap();
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_FALSE(batch_result.ok());
ASSERT_TRUE(batch_result.status().IsIOError());
ASSERT_FALSE(prefetch_reader->is_shutdown_);
ASSERT_NOK(prefetch_reader->GetReadStatus());
ASSERT_FALSE(HasValue(prefetch_reader->prefetch_queues_));
// call NextBatch again, will still return error status
auto batch_result2 = reader->NextBatchWithBitmap();
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_FALSE(batch_result2.ok());
ASSERT_TRUE(batch_result2.status().IsIOError());
}
TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) {
auto data_array = PrepareArray(0);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0);
ASSERT_FALSE(result_array);
}
TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) {
auto data_array = PrepareArray(10);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 6;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10);
auto expected_array = std::make_shared<arrow::ChunkedArray>(data_array);
ASSERT_TRUE(result_array->Equals(expected_array));
// continue to call NextBatch() after reading eof
ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap());
ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap));
}
TEST_F(PrefetchFileBatchReaderImplTest, TestCreateReaderWithoutNextBatch) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 3;
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
}
TEST_F(PrefetchFileBatchReaderImplTest, TestInvalidCase) {
auto data_array = PrepareArray(101);
int32_t batch_size = 10;
int32_t prefetch_max_parallel_num = 3;
std::string data_file_path = "";
MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size);
{
ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
data_file_path, &reader_builder, mock_fs_,
/*prefetch_max_parallel_num=*/0, batch_size, 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
}
{
ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, /*batch_size=*/-1,
prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
}
{
ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false,
/*executor=*/nullptr, /*initialize_read_ranges=*/true,
/*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool()));
}
{
ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
data_file_path, /*reader_builder=*/nullptr, mock_fs_, prefetch_max_parallel_num,
batch_size, prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
}
{
ASSERT_NOK(PrefetchFileBatchReaderImpl::Create(
data_file_path, &reader_builder,
/*fs=*/nullptr, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
}
{
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size,
prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_NOK_WITH_MSG(reader->SeekToRow(/*row_number=*/101),
"not support seek to row for prefetch reader");
}
}
/// There are three stripes: [0,30), [30,60), [60,90). After predicate pushdown, the stripe
/// [30,60) will be filtered out.
/// The read range is [0,30), [30,60), [60,90). So, expected results is [0,30), [60,90)
TEST_P(PrefetchFileBatchReaderImplTest, TestPrefetchWithPredicatePushdownWithCompleteFiltering) {
auto [file_format, cache_mode] = GetParam();
auto data_array = PrepareArray(90);
PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/30);
auto schema = arrow::schema(fields_);
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::Or({
PredicateBuilder::LessThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::BIGINT, Literal(20l)),
PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::BIGINT, Literal(70l)),
}));
auto reader =
PreparePrefetchReader(file_format, schema.get(), predicate,
/*selection_bitmap=*/std::nullopt,
/*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode);
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);
arrow::ArrayVector expected_array_vector;
expected_array_vector.push_back(data_array->Slice(0, 30));
expected_array_vector.push_back(data_array->Slice(60, 30));
auto expected_array = std::make_shared<arrow::ChunkedArray>(expected_array_vector);
ASSERT_TRUE(CheckEqual(expected_array, result_array));
}
/// There are three stripes: [0,30), [30,60), [60,90). Each stripe has 3 row groups.
/// After predicate pushdown, the row group [0, 20), [70, 90) will be remained.
/// The read range is [0,30), [30,60), [60,90).
TEST_P(PrefetchFileBatchReaderImplTest,
TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) {
auto [file_format, cache_mode] = GetParam();
auto data_array = PrepareArray(90);
PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/10);
auto schema = arrow::schema(fields_);
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::Or({
PredicateBuilder::LessThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::BIGINT, Literal(20l)),
PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::BIGINT, Literal(70l)),
}));
auto reader =
PreparePrefetchReader(file_format, schema.get(), predicate,
/*selection_bitmap=*/std::nullopt,
/*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode);
ASSERT_OK(reader->RefreshReadRanges());
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), -1);
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(
reader.get(), /*max simulated data processing time*/ 100));
ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90);
arrow::ArrayVector expected_array_vector;
expected_array_vector.push_back(data_array->Slice(0, 20));
expected_array_vector.push_back(data_array->Slice(70, 20));
auto expected_array = std::make_shared<arrow::ChunkedArray>(expected_array_vector);
ASSERT_TRUE(CheckEqual(expected_array, result_array));
}
TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) {
auto data_array = PrepareArray(10000);
std::set<int32_t> valid_row_ids;
for (int32_t i = 0; i < 5120; i++) {
valid_row_ids.insert(paimon::test::RandomNumber(0, data_array->length() - 1));
}
std::vector<int32_t> bitmap_data(valid_row_ids.begin(), valid_row_ids.end());
auto bitmap = RoaringBitmap32::From(bitmap_data);
MockFormatReaderBuilder reader_builder(data_array, data_type_, bitmap,
/*read_batch_size=*/100);
int32_t prefetch_max_parallel_num = 3;
ASSERT_OK_AND_ASSIGN(
auto reader,
PrefetchFileBatchReaderImpl::Create(
/*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num,
/*batch_size=*/100, prefetch_max_parallel_num * 2,
/*enable_adaptive_prefetch_strategy=*/false, executor_,
/*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS,
CacheConfig(), GetDefaultPool()));
ASSERT_OK_AND_ASSIGN(auto result_chunk_array, ReadResultCollector::CollectResult(reader.get()));
ASSERT_OK_AND_ASSIGN(auto data_batch, ReadResultCollector::GetReadBatch(data_array));
ASSERT_OK_AND_ASSIGN(auto expected_batch, ReaderUtils::ApplyBitmapToReadBatch(
std::make_pair(std::move(data_batch), bitmap),
arrow::default_memory_pool()));
ASSERT_OK_AND_ASSIGN(auto expected_array,
ReadResultCollector::GetArray(std::move(expected_batch)));
auto expected_chunk_array = std::make_shared<arrow::ChunkedArray>(expected_array);
ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array));
}
} // namespace paimon::test