blob: 37a415dea24c6f644da2d615a5eeac13baa125c0 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "paimon/common/reader/concat_batch_reader.h"
#include <algorithm>
#include <cstdint>
#include <string>
#include <utility>
#include "arrow/api.h"
#include "arrow/array/array_base.h"
#include "arrow/array/array_nested.h"
#include "arrow/ipc/json_simple.h"
#include "gtest/gtest.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/status.h"
#include "paimon/testing/mock/mock_file_batch_reader.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
#include "paimon/utils/roaring_bitmap32.h"
namespace paimon::test {
class ConcatBatchReaderTest : public ::testing::Test {
void SetUp() override {
pool_ = GetDefaultPool();
}
void CheckResult(const std::vector<std::string>& batches, const std::string& expected) {
std::vector<std::pair<std::string, std::vector<int32_t>>> batches_with_bitmap;
for (const auto& batch_str : batches) {
int32_t row_count = std::count(batch_str.begin(), batch_str.end(), ',');
if (batch_str != "[]") {
row_count += 1;
}
std::vector<int32_t> bitmap_data;
for (int32_t i = 0; i < row_count; i++) {
bitmap_data.push_back(i);
}
batches_with_bitmap.emplace_back(batch_str, bitmap_data);
}
return CheckResult(batches_with_bitmap, expected);
}
void CheckResult(const std::vector<std::pair<std::string, std::vector<int32_t>>>& batches,
const std::string& expected) {
for (const auto& batch_size : {1, 2, 4, 8}) {
std::vector<std::unique_ptr<BatchReader>> readers;
for (const auto& [batch_str, bitmap_data] : batches) {
auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), batch_str)
.ValueOrDie();
std::shared_ptr<arrow::Array> data =
arrow::StructArray::Make({f1}, {arrow::field("f1", arrow::int32())})
.ValueOrDie();
auto reader = std::make_unique<MockFileBatchReader>(
data, data->type(), RoaringBitmap32::From(bitmap_data), batch_size);
readers.push_back(std::move(reader));
}
auto concat_reader = std::make_unique<ConcatBatchReader>(std::move(readers), pool_);
ASSERT_OK_AND_ASSIGN(auto result_chunk_array,
ReadResultCollector::CollectResult(concat_reader.get()));
if (expected.empty()) {
ASSERT_FALSE(result_chunk_array);
return;
}
auto expected_f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), expected).ValueOrDie();
std::shared_ptr<arrow::Array> expected_array =
arrow::StructArray::Make({expected_f1}, {arrow::field("f1", arrow::int32())})
.ValueOrDie();
auto expected_chunk_array = std::make_shared<arrow::ChunkedArray>(expected_array);
ASSERT_TRUE(expected_chunk_array->Equals(result_chunk_array))
<< result_chunk_array->ToString();
}
}
private:
std::shared_ptr<MemoryPool> pool_;
};
TEST_F(ConcatBatchReaderTest, TestSimple) {
CheckResult({"[10, 11, 12, 13, 14]"}, "[10, 11, 12, 13, 14]");
CheckResult({"[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]"},
"[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
CheckResult({"[]", "[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]"},
"[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
CheckResult({"[10, 11, 12, 13, 14]", "[]", "[16, 17, 20]", "[24]", "[100]"},
"[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
CheckResult({"[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]", "[]"},
"[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
// no data in reader
CheckResult({"[]"}, "");
// no reader
CheckResult(std::vector<std::string>{}, "");
}
TEST_F(ConcatBatchReaderTest, TestSimpleWithBitmap) {
{
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
{"[10, 11, 12, 13, 14]", {0, 1, 3, 4}}};
CheckResult(src_data, "[10, 11, 13, 14]");
}
{
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
{"[10, 11, 12, 13, 14]", {1, 2, 3}},
{"[16, 17, 20]", {0, 2}},
{"[24]", {}},
{"[100]", {0}}};
CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
}
{
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
{"[]", {}},
{"[10, 11, 12, 13, 14]", {1, 2, 3}},
{"[16, 17, 20]", {0, 2}},
{"[24]", {}},
{"[100]", {0}}};
CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
}
{
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
{"[10, 11, 12, 13, 14]", {1, 2, 3}},
{"[]", {}},
{"[16, 17, 20]", {0, 2}},
{"[24]", {}},
{"[100]", {0}}};
CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
}
{
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
{"[10, 11, 12, 13, 14]", {1, 2, 3}},
{"[16, 17, 20]", {0, 2}},
{"[24]", {}},
{"[100]", {0}},
{"[]", {}},
};
CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
}
{
// no data in reader
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {{"[]", {}},
{"[]", {}}};
CheckResult(src_data, "");
}
{
// no reader
std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {};
CheckResult(src_data, "");
}
}
} // namespace paimon::test