blob: bd26f18721392fd89d6a6a013283604e6e1dd8da [file] [log] [blame]
/*
* Copyright 2024-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdint>
#include <cstring>
#include <iostream>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "arrow/api.h"
#include "arrow/array/array_base.h"
#include "arrow/c/abi.h"
#include "arrow/ipc/json_simple.h"
#include "arrow/util/string_builder.h"
#include "gtest/gtest.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/file_index/bitmap/bitmap_file_index_factory.h"
#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.h"
#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.h"
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/core/io/data_file_meta.h"
#include "paimon/core/manifest/file_source.h"
#include "paimon/core/stats/simple_stats.h"
#include "paimon/core/table/source/data_split_impl.h"
#include "paimon/core/table/source/deletion_file.h"
#include "paimon/data/timestamp.h"
#include "paimon/defs.h"
#include "paimon/factories/factory_creator.h"
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/metrics.h"
#include "paimon/predicate/literal.h"
#include "paimon/predicate/predicate_builder.h"
#include "paimon/read_context.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/table/source/table_read.h"
#include "paimon/testing/utils/io_exception_helper.h"
#include "paimon/testing/utils/read_result_collector.h"
#include "paimon/testing/utils/testharness.h"
namespace paimon {
class DataSplit;
class Predicate;
} // namespace paimon
namespace paimon::test {
class ReadInteWithIndexTest : public testing::Test,
public ::testing::WithParamInterface<std::pair<std::string, bool>> {
public:
void SetUp() override {
pool_ = GetDefaultPool();
}
void TearDown() override {}
void CheckResult(const std::string& table_path,
const std::vector<std::shared_ptr<Split>> splits,
const std::shared_ptr<Predicate>& predicate,
const std::shared_ptr<arrow::ChunkedArray>& expected_array) const {
auto [file_format, enable_prefetch] = GetParam();
ReadContextBuilder context_builder(table_path);
context_builder.AddOption("read.batch-size", "2")
.AddOption("test.enable-adaptive-prefetch-strategy", "false")
.SetPredicate(predicate);
if (enable_prefetch) {
context_builder.EnablePrefetch(true).SetPrefetchBatchCount(3);
}
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context)));
ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(splits));
ASSERT_OK_AND_ASSIGN(auto result_array,
ReadResultCollector::CollectResult(batch_reader.get()));
::arrow::PrettyPrintOptions print_option;
print_option.container_window = 100;
if (expected_array) {
ASSERT_TRUE(result_array->Equals(*expected_array))
<< "\nActual:" << ::arrow::PrettyPrint(*result_array, print_option, &std::cout)
<< "\nExpected:" << ::arrow::PrettyPrint(*expected_array, print_option, &std::cout);
} else {
ASSERT_FALSE(result_array);
}
}
void CheckResultForBitmap(const std::string& path,
const std::shared_ptr<arrow::DataType>& arrow_data_type,
const std::shared_ptr<Split> split) const {
{
// test with non predicate
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1],
[0, "Tony", 20, 0, 17.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, /*predicate=*/nullptr, expected_array);
}
{
// test equal predicate for f0
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not equal predicate for f0
auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Bob", 10, 1, 12.1],
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1],
[0, "Tony", 20, 0, 17.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal predicate for f1
auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(20));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Lucy", 20, 1, 15.1],
[0, "Tony", 20, 0, 17.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal predicate for f2
auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2",
FieldType::INT, Literal(1));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test is null predicate
auto predicate =
PredicateBuilder::IsNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT);
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test is not null predicate
auto predicate =
PredicateBuilder::IsNotNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT);
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1],
[0, "Tony", 20, 0, 17.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test in predicate
auto predicate = PredicateBuilder::In(
/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
{Literal(FieldType::STRING, "Alice", 5), Literal(FieldType::STRING, "Bob", 3),
Literal(FieldType::STRING, "Lucy", 4)});
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not in predicate
auto predicate = PredicateBuilder::NotIn(
/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
{Literal(FieldType::STRING, "Alice", 5), Literal(FieldType::STRING, "Bob", 3),
Literal(FieldType::STRING, "Lucy", 4)});
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Tony", 20, 0, 17.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate
auto f0_predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(20));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f0_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test or predicate
auto f0_predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(20));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::Or({f0_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Tony", 20, 0, 17.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test predicate push down
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(30));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
{
// test non-result
auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(30));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
{
// test early stopping
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(10));
auto f2_predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2",
FieldType::INT, Literal(6));
auto f0_predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f1_predicate, f2_predicate, f0_predicate}));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
}
void CheckResultForBitmapWithSingleRowGroup(
const std::string& path, const std::shared_ptr<arrow::DataType>& arrow_data_type,
const std::shared_ptr<Split> split) const {
// test bitmap index takes effective
CheckResultForBitmap(path, arrow_data_type, split);
// test no index take effective
{
// test greater than predicate (take no effective on bitmap index)
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(10));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1],
[0, "Tony", 20, 0, 17.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test predicate on f3 (do not have index)
auto predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(14.1));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Lucy", 20, 1, 15.1],
[0, "Bob", 10, 1, 16.1],
[0, "Tony", 20, 0, 17.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
void CheckResultForBsi(const std::string& path,
const std::shared_ptr<arrow::DataType>& arrow_data_type,
const std::shared_ptr<Split> split) const {
{
// test with non predicate
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, /*predicate=*/nullptr, expected_array);
}
{
// test is null predicate for f4
auto predicate = PredicateBuilder::IsNull(/*field_index=*/4, /*field_name=*/"f4",
FieldType::TIMESTAMP);
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Bob", 100, 2, 16.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal predicate for f1
auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 100, 2, 16.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not equal predicate for f2
auto predicate = PredicateBuilder::NotEqual(/*field_index=*/2, /*field_name=*/"f2",
FieldType::INT, Literal(-2));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test greater than predicate for f1
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Lucy", 500, -1, 15.1, -1764877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test greater or equal predicate for f2
auto predicate = PredicateBuilder::GreaterOrEqual(
/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(-1));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test less than predicate for f4
auto predicate = PredicateBuilder::LessThan(/*field_index=*/4, /*field_name=*/"f4",
FieldType::TIMESTAMP,
Literal(Timestamp(1745542802000l, 123000)));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test less or equal predicate for f4, as timestamp is normalized to long (micros),
// 123001 ns is approximated to 123 micro
auto predicate = PredicateBuilder::LessOrEqual(
/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(1745542802000l, 123001)));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test in for f2
auto predicate =
PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT,
{Literal(-1), Literal(2), Literal(-2)});
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not in for f1
auto predicate =
PredicateBuilder::NotIn(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT,
{Literal(100), Literal(400), Literal(200)});
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate
auto f1_predicate = PredicateBuilder::GreaterThan(
/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, Literal(100));
auto f2_predicate = PredicateBuilder::LessOrEqual(
/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(0));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f1_predicate, f2_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Lucy", 500, -1, 15.1, -1764877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test or predicate
auto f2_predicate = PredicateBuilder::GreaterThan(
/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(0));
auto f4_predicate = PredicateBuilder::LessOrEqual(
/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(1745542802000l, 123000)));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::Or({f2_predicate, f4_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate for is not null
auto f1_predicate =
PredicateBuilder::IsNotNull(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT);
auto f2_predicate =
PredicateBuilder::IsNotNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT);
auto f4_predicate = PredicateBuilder::IsNotNull(
/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP);
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f1_predicate, f2_predicate, f4_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
private:
std::shared_ptr<MemoryPool> pool_;
};
std::vector<std::pair<std::string, bool>> GetTestValuesForReadInteWithIndexTest() {
std::vector<std::pair<std::string, bool>> values = {{"parquet", false}, {"parquet", true}};
#ifdef PAIMON_ENABLE_ORC
values.emplace_back("orc", false);
values.emplace_back("orc", true);
#endif
return values;
}
INSTANTIATE_TEST_SUITE_P(FileFormatAndEnablePaimonPrefetch, ReadInteWithIndexTest,
::testing::ValuesIn(std::vector<std::pair<std::string, bool>>(
GetTestValuesForReadInteWithIndexTest())));
TEST_P(ReadInteWithIndexTest, TestSimple) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc";
} else if (file_format == "parquet") {
file_name = "data-783929b2-49d4-4006-a898-194a62e3278d-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
ReadContextBuilder context_builder(path);
context_builder.AddOption("read.batch-size", "2")
.AddOption("test.enable-adaptive-prefetch-strategy", "false")
.AddOption("orc.read.enable-metrics", "true")
.SetPredicate(predicate);
if (enable_prefetch) {
context_builder.EnablePrefetch(true).SetPrefetchBatchCount(3);
}
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context)));
ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(split));
ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult(batch_reader.get()));
ASSERT_TRUE(result_array);
ASSERT_TRUE(result_array->Equals(*expected_array));
// test metrics
if (file_format == "orc") {
auto read_metrics = batch_reader->GetReaderMetrics();
ASSERT_OK_AND_ASSIGN(uint64_t io_count, read_metrics->GetCounter("orc.read.io.count"));
ASSERT_GT(io_count, 0);
ASSERT_OK_AND_ASSIGN(uint64_t latency,
read_metrics->GetCounter("orc.read.inclusive.latency.us"));
ASSERT_GT(latency, 0);
}
batch_reader->Close();
}
TEST_P(ReadInteWithIndexTest, TestReadWithLimits) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc";
} else if (file_format == "parquet") {
file_name = "data-783929b2-49d4-4006-a898-194a62e3278d-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
auto predicate =
PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
ReadContextBuilder context_builder(path);
context_builder.AddOption(Options::FILE_FORMAT, "orc")
.AddOption("test.enable-adaptive-prefetch-strategy", "false")
.AddOption(Options::READ_BATCH_SIZE, "1")
.AddOption("orc.read.enable-metrics", "true")
.SetPredicate(predicate);
if (enable_prefetch) {
context_builder.EnablePrefetch(true).SetPrefetchBatchCount(3);
}
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context)));
ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(split));
// simulate read limits, only read 3 batches
for (int32_t i = 0; i < 3; i++) {
ASSERT_OK_AND_ASSIGN(BatchReader::ReadBatch batch, batch_reader->NextBatch());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> array,
ReadResultCollector::GetArray(std::move(batch)));
ASSERT_TRUE(array);
ASSERT_EQ(array->length(), 1);
}
batch_reader->Close();
// test metrics
if (file_format == "orc") {
auto read_metrics = batch_reader->GetReaderMetrics();
ASSERT_TRUE(read_metrics);
ASSERT_OK_AND_ASSIGN(uint64_t io_count, read_metrics->GetCounter("orc.read.io.count"));
ASSERT_GT(io_count, 0);
ASSERT_OK_AND_ASSIGN(uint64_t latency,
read_metrics->GetCounter("orc.read.inclusive.latency.us"));
ASSERT_GT(latency, 0);
}
}
TEST_P(ReadInteWithIndexTest, TestEmbeddingBitmapIndex) {
auto [file_format, enable_prefetch] = GetParam();
std::string path =
GetDataDir() + "/" + file_format + "/append_with_bitmap.db/append_with_bitmap/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-62feb610-c83f-4217-9b50-bbad9cd08eb4-0.orc";
} else if (file_format == "parquet") {
file_name = "data-54b396b6-df2d-4c6a-a0ae-b6a5afb612d7-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
std::vector<uint8_t> embedded_bytes = {
0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0,
0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112,
0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6,
98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50,
0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0,
0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0,
0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0,
85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0,
0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0,
20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255,
0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0,
0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0,
1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48,
0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0,
58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0,
6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0,
0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10,
0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0,
0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0,
4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0,
16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0,
8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22,
0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0,
0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0,
3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0};
auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get());
memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()),
embedded_bytes.size());
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/std::vector<std::optional<std::string>>(),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/embedded_index, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
CheckResultForBitmapWithSingleRowGroup(path, arrow_data_type, split);
}
TEST_P(ReadInteWithIndexTest, TestBitmapWithV1) {
auto [file_format, enable_prefetch] = GetParam();
std::string path =
GetDataDir() + "/" + file_format + "/append_with_bitmap_v1.db/append_with_bitmap_v1/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-c29cf741-80ce-4b74-9cf3-e42efef557b3-0.orc";
} else if (file_format == "parquet") {
file_name = "data-6b38a150-c99c-4add-a449-a48a5d34e36c-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
std::vector<uint8_t> embedded_bytes = {
0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0,
0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112,
0, 0, 0, 96, 0, 0, 0, 131, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6,
98, 105, 116, 109, 97, 112, 0, 0, 0, 227, 0, 0, 0, 74, 0, 2, 102, 50,
0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 45, 0, 0,
0, 76, 0, 0, 0, 0, 1, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0,
0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 4, 76, 117, 99,
121, 255, 255, 255, 251, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0,
0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 0, 0, 0, 4, 84, 111, 110,
121, 0, 0, 0, 40, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16,
0, 0, 0, 0, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1,
0, 16, 0, 0, 0, 1, 0, 5, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0,
0, 1, 0, 16, 0, 0, 0, 3, 0, 6, 0, 1, 0, 0, 0, 8, 0, 0,
0, 2, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0,
22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, 4,
0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, 16,
0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 1, 0, 0, 0, 8,
0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 0, 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2,
0, 16, 0, 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0,
0, 0, 0, 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0};
auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get());
memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()),
embedded_bytes.size());
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/std::vector<std::optional<std::string>>(),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/embedded_index, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
CheckResultForBitmapWithSingleRowGroup(path, arrow_data_type, split);
}
TEST_P(ReadInteWithIndexTest, TestNoEmbeddingBitmapIndex) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc";
} else if (file_format == "parquet") {
file_name = "data-783929b2-49d4-4006-a898-194a62e3278d-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
CheckResultForBitmapWithSingleRowGroup(path, arrow_data_type, split);
}
TEST_P(ReadInteWithIndexTest, TestNoEmbeddingBitmapIndexWithExternalPath) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/";
std::string file_name, external_file_path;
if (file_format == "orc") {
file_name = "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc";
external_file_path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/external-path/bucket-0/" +
file_name;
} else if (file_format == "parquet") {
file_name = "data-783929b2-49d4-4006-a898-194a62e3278d-0.parquet";
external_file_path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/external-path/bucket-0/" +
file_name;
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/external_file_path, /*first_row_id=*/std::nullopt,
/*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
CheckResultForBitmapWithSingleRowGroup(path, arrow_data_type, split);
}
TEST_P(ReadInteWithIndexTest, TestBitmapIndexWithDv) {
auto [file_format, enable_prefetch] = GetParam();
std::string path =
GetDataDir() + "/" + file_format + "/pk_with_bitmap_dv.db/pk_with_bitmap_dv/";
std::string file_name, deletion_file_path;
if (file_format == "orc") {
file_name = "data-0da0dc51-797c-4c82-9b08-269b2ce400dc-0.orc";
deletion_file_path = path + "/index/index-73ec72e9-489e-49d9-a96c-f6db9b56fa07-0";
} else if (file_format == "parquet") {
file_name = "data-bfa36b28-40e7-4026-893b-d23116973174-0.parquet";
deletion_file_path = path + "/index/index-daed25d2-8e68-45ba-aaad-ff342666ef27-0";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("key", arrow::int32())),
DataField(1, arrow::field("f0", arrow::utf8())),
DataField(2, arrow::field("f1", arrow::int32())),
DataField(3, arrow::field("f2", arrow::int32())),
DataField(4, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/1001,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/5,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DeletionFile deletion_file(deletion_file_path,
/*offset=*/1, /*length=*/24, /*cardinality=*/2);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split, builder.WithSnapshot(4)
.WithDataDeletionFiles({deletion_file})
.IsStreaming(false)
.RawConvertible(true)
.Build());
{
// test with non predicate
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 1, "Bob", 10, 1, 12.1],
[0, 2, "Emily", 10, 0, 13.1],
[0, 3, "Tony", 10, 0, 14.1],
[0, 5, "Bob", 10, 1, 16.1],
[0, 6, "Tony", 20, 0, 17.1],
[0, 7, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, /*predicate=*/nullptr, expected_array);
}
{
// test equal, Alice with key 0 is removed by dv
auto predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 7, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal, Lucy is removed by dv
auto predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Lucy", 4));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
{
// test or predicate
auto f0_predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f1",
FieldType::INT, Literal(10));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({f0_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 1, "Bob", 10, 1, 12.1],
[0, 2, "Emily", 10, 0, 13.1],
[0, 3, "Tony", 10, 0, 14.1],
[0, 5, "Bob", 10, 1, 16.1],
[0, 7, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
TEST_P(ReadInteWithIndexTest, TestWithAlterTable) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/";
std::string file_name1, file_name2;
if (file_format == "orc") {
file_name1 = "data-68014988-5451-478f-a18a-a1668214cf3d-0.orc";
file_name2 = "data-a29b7235-760d-4838-881c-39cbef585dd2-0.orc";
} else if (file_format == "parquet") {
file_name1 = "data-da667947-0604-4459-973b-9d364c3953a9-0.parquet";
file_name2 = "data-2efd8477-9fb4-44a6-9a39-901dd5a83b28-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(1, arrow::field("f1", arrow::int64())),
DataField(0, arrow::field("f4", arrow::utf8())),
DataField(3, arrow::field("f3", arrow::float64())),
DataField(4, arrow::field("f5", arrow::int32()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto create_data_file_meta = [&](const std::string& file_name, int32_t schema_id,
const std::vector<uint8_t>& embedded_bytes) {
auto embedded_index = std::make_shared<Bytes>(embedded_bytes.size(), pool_.get());
memcpy(embedded_index->data(), reinterpret_cast<const void*>(embedded_bytes.data()),
embedded_bytes.size());
return std::make_shared<DataFileMeta>(
file_name, /*file_size=*/10,
/*row_count=*/10, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(),
/*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/10, schema_id,
/*level=*/0,
/*extra_files=*/std::vector<std::optional<std::string>>(),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/embedded_index, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt,
/*write_cols=*/std::nullopt);
};
std::vector<uint8_t> embedded_bytes1 = {
0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0,
0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112,
0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6,
98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50,
0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0,
0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0,
0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0,
85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0,
0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0,
20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255,
0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0,
0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0,
1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48,
0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0,
58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0,
6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0,
0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10,
0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0,
0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0,
4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0,
16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0,
8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22,
0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0,
0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0,
3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0};
std::vector<uint8_t> embedded_bytes2 = {
0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0,
0, 3, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112,
0, 0, 0, 96, 0, 0, 0, 102, 0, 2, 102, 52, 0, 0, 0, 1, 0, 6,
98, 105, 116, 109, 97, 112, 0, 0, 0, 198, 0, 0, 0, 101, 0, 2, 102, 53,
0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 43, 0, 0,
0, 82, 0, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0, 0, 3, 0, 0, 0,
0, 1, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, 52,
0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 10, 255, 255, 255, 253, 255, 255,
255, 255, 0, 0, 0, 0, 0, 0, 0, 20, 255, 255, 255, 254, 255, 255, 255, 255,
0, 0, 0, 0, 0, 0, 0, 30, 0, 0, 0, 0, 0, 0, 0, 20, 58, 48,
0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 3, 0,
2, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 0, 1, 0, 0, 0, 5,
65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 70, 0, 0, 0, 4, 0,
0, 0, 5, 65, 108, 105, 99, 101, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0,
0, 3, 66, 111, 98, 255, 255, 255, 253, 255, 255, 255, 255, 0, 0, 0, 5, 68,
97, 118, 105, 100, 255, 255, 255, 252, 255, 255, 255, 255, 0, 0, 0, 5, 69, 109,
105, 108, 121, 255, 255, 255, 254, 255, 255, 255, 255, 2, 0, 0, 0, 4, 0, 0,
0, 2, 1, 255, 255, 255, 252, 0, 0, 0, 18, 0, 0, 0, 1, 0, 0, 0,
100, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 100, 0,
0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 101, 255, 255, 255, 254, 255, 255, 255,
255, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0,
0, 2, 0};
auto data_file_meta1 = create_data_file_meta(file_name1, 0, embedded_bytes1);
auto data_file_meta2 = create_data_file_meta(file_name2, 1, embedded_bytes2);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "/bucket-0/",
{data_file_meta1, data_file_meta2});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(2).IsStreaming(false).RawConvertible(true).Build());
{
// test with non predicate
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 20, "Emily", 22.1, 101],
[0, 10, "Bob", 23.1, 100],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, /*predicate=*/nullptr, expected_array);
}
{
// test equal predicate for f1
auto predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(10l));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 10, "Bob", 16.1, null],
[0, 10, "Bob", 23.1, 100]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not equal predicate for f1
auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(10l));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 20, "Lucy", 15.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 20, "Emily", 22.1, 101],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal predicate for f4
auto predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not equal predicate for f4
auto predicate =
PredicateBuilder::NotEqual(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Emily", 22.1, 101],
[0, 10, "Bob", 23.1, 100],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal predicate for f3, only do predicate push down
auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(14.1));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test is null predicate for f5
auto predicate =
PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f5", FieldType::INT);
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test is_not_null predicate for f5
// recall result contains the whole data of the first file, as f5 does not exist in the
// first data file (the predicate is removed when reading that file)
auto predicate =
PredicateBuilder::IsNotNull(/*field_index=*/3, /*field_name=*/"f5", FieldType::INT);
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 20, "Emily", 22.1, 101],
[0, 10, "Bob", 23.1, 100]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test greater than predicate for f1, do not take effective in bitmap index
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(10l));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 20, "Emily", 22.1, 101],
[0, 10, "Bob", 23.1, 100],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test in predicate
auto predicate =
PredicateBuilder::In(/*field_index=*/0, /*field_name=*/"f1", FieldType::BIGINT,
{Literal(10l), Literal(30l), Literal(50l)});
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 10, "Bob", 16.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 10, "Bob", 23.1, 100],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test not in predicate
auto predicate =
PredicateBuilder::NotIn(/*field_index=*/0, /*field_name=*/"f1", FieldType::BIGINT,
{Literal(10l), Literal(30l), Literal(50l)});
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 20, "Lucy", 15.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 20, "Emily", 22.1, 101]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate
auto f4_predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(30l));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f4_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 30, "Alice", 21.1, 100]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test or predicate
auto f4_predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(30l));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({f4_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test early stop
auto f4_predicate =
PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(40l));
auto f5_predicate =
PredicateBuilder::IsNotNull(/*field_index=*/3, /*field_name=*/"f5", FieldType::INT);
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f4_predicate, f1_predicate, f5_predicate}));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
{
// test non result
auto predicate = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(40l));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
{
auto predicate = PredicateBuilder::NotEqual(/*field_index=*/0, /*field_name=*/"f1",
FieldType::BIGINT, Literal(40l));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, 10, "Alice", 11.1, null],
[0, 10, "Bob", 12.1, null],
[0, 10, "Emily", 13.1, null],
[0, 10, "Tony", 14.1, null],
[0, 20, "Lucy", 15.1, null],
[0, 10, "Bob", 16.1, null],
[0, 20, "Tony", 17.1, null],
[0, 20, "Alice", 18.1, null],
[0, 30, "Alice", 21.1, 100],
[0, 20, "Emily", 22.1, 101],
[0, 10, "Bob", 23.1, 100],
[0, 30, "David", 24.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
TEST_P(ReadInteWithIndexTest, TestWithBsiIndex) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format + "/append_with_bsi.db/append_with_bsi/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-0befce14-587d-48e5-ad08-efeecceb0f09-0.orc";
} else if (file_format == "parquet") {
file_name = "data-af9e31b7-b44d-4a0e-85ae-5877d224ec9f-0.parquet";
}
std::vector<DataField> read_fields = {
SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64())),
DataField(4, arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/932,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
{
// test equal predicate for f0, take no effective as bsi does not support string
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
CheckResultForBsi(path, arrow_data_type, {split});
}
TEST_P(ReadInteWithIndexTest, TestWithBloomFilterIndex) {
auto [file_format, enable_prefetch] = GetParam();
std::string path =
GetDataDir() + "/" + file_format + "/append_with_bloomfilter.db/append_with_bloomfilter/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-34e8acb2-110b-4c32-9dc6-d6435178d0ad-0.orc";
} else if (file_format == "parquet") {
file_name = "data-79a27bc0-bcec-4062-9915-83ec8eb1622d-0.parquet";
}
std::vector<DataField> read_fields = {
SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64())),
DataField(4, arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/932,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
std::shared_ptr<arrow::ChunkedArray> all_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&all_array);
ASSERT_TRUE(array_status.ok());
{
// test with non predicate
CheckResult(path, {split}, /*predicate=*/nullptr, all_array);
}
{
// test equal predicate for f0
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
CheckResult(path, {split}, predicate, all_array);
}
{
// test equal predicate for f0, where literal does not exist
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice2", 6));
CheckResult(path, {split}, predicate, nullptr);
}
{
// test equal predicate for f1
auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(200));
CheckResult(path, {split}, predicate, all_array);
}
{
// test equal predicate for f1, where literal does not exist
auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(201));
CheckResult(path, {split}, predicate, nullptr);
}
{
// test equal predicate for f2
auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2",
FieldType::INT, Literal(-1));
CheckResult(path, {split}, predicate, all_array);
}
{
// test equal predicate for f2, where literal does not exist
auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2",
FieldType::INT, Literal(0));
CheckResult(path, {split}, predicate, nullptr);
}
{
// test equal predicate for f3
auto predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(13.1));
CheckResult(path, {split}, predicate, all_array);
}
{
// test equal predicate for f3, where literal does not exist
auto predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(13.2));
CheckResult(path, {split}, predicate, nullptr);
}
{
// test equal predicate for f4
auto predicate =
PredicateBuilder::Equal(/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(1745542902000l, 123000)));
CheckResult(path, {split}, predicate, all_array);
}
{
// test equal predicate for f4, where literal does not exist
auto predicate =
PredicateBuilder::Equal(/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(1745542502000l, 123000)));
CheckResult(path, {split}, predicate, nullptr);
}
{
// test not equal predicate for f1
auto predicate = PredicateBuilder::NotEqual(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(200));
CheckResult(path, {split}, predicate, all_array);
}
{
// test is null predicate for f2
auto predicate =
PredicateBuilder::IsNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT);
CheckResult(path, {split}, predicate, all_array);
}
{
// test is not null predicate for f2
auto predicate =
PredicateBuilder::IsNotNull(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT);
CheckResult(path, {split}, predicate, all_array);
}
{
// test greater than predicate for f1
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(200));
CheckResult(path, {split}, predicate, all_array);
}
{
// test in for f2
auto predicate =
PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT,
{Literal(-1), Literal(2), Literal(100)});
CheckResult(path, {split}, predicate, all_array);
}
{
// test in for f2, where literals do not exist
auto predicate =
PredicateBuilder::In(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT,
{Literal(-1000), Literal(0), Literal(1000)});
CheckResult(path, {split}, predicate, nullptr);
}
{
// test not in for f3
auto predicate =
PredicateBuilder::NotIn(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE,
{Literal(11.1), Literal(12.1), Literal(13.1)});
CheckResult(path, {split}, predicate, all_array);
}
{
// test not in for f3, where literals do not exist
auto predicate =
PredicateBuilder::NotIn(/*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE,
{Literal(11.12), Literal(12.12), Literal(13.12)});
CheckResult(path, {split}, predicate, all_array);
}
{
// test and predicate
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
auto f4_predicate =
PredicateBuilder::Equal(/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(1745542902000l, 123000)));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f1_predicate, f4_predicate}));
CheckResult(path, {split}, predicate, all_array);
}
{
// test and predicate
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
auto f4_predicate =
PredicateBuilder::Equal(/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(-1728l, 123000)));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f1_predicate, f4_predicate}));
CheckResult(path, {split}, predicate, nullptr);
}
{
// test or predicate
auto f1_predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
auto f4_predicate =
PredicateBuilder::Equal(/*field_index=*/4, /*field_name=*/"f4", FieldType::TIMESTAMP,
Literal(Timestamp(-1728l, 123000)));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({f1_predicate, f4_predicate}));
CheckResult(path, {split}, predicate, all_array);
}
}
TEST_P(ReadInteWithIndexTest, TestBitmapPushDownWithMultiStripes) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = paimon::test::GetDataDir() +
"/append_with_bitmap_multi_stripes.db/append_with_bitmap_multi_stripes/";
std::string file_name, index_file_name;
if (file_format == "orc") {
// each record is a stripe
file_name = "data-55e21ed3-d118-4d75-a0fd-cd52039cb634-0.orc";
index_file_name = "data-55e21ed3-d118-4d75-a0fd-cd52039cb634-0.orc.index";
} else if (file_format == "parquet") {
// each record is a row group
file_name = "data-multi-row-groups.parquet";
index_file_name = "data-55e21ed3-d118-4d75-a0fd-cd52039cb634-0.orc.index";
}
// each record is a stripe
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/2637,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({index_file_name}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
// test bitmap index takes effective
CheckResultForBitmap(path, arrow_data_type, split);
// test predicate push down takes effective
{
// test greater than predicate (take no effective on bitmap index, but predicates can
// be pushdown)
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/
"f1", FieldType::INT, Literal(10));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Lucy", 20, 1, 15.1],
[0, "Tony", 20, 0, 17.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test predicate on f3 (do not have index), but predicates can be pushdown
auto predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(14.1));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Tony", 10, 0, 14.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate, although the bitmap index cannot handle the LessThan
// predicate, the result is still correct due to predicate pushdown
auto f0_predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::LessThan(/*field_index=*/1, /*field_name=*/
"f1", FieldType::INT, Literal(15));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({f0_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test or predicate, although the bitmap index cannot handle the LessOrEqual
// predicate, the result is still correct due to predicate pushdown
auto f0_predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
auto f1_predicate = PredicateBuilder::LessOrEqual(/*field_index=*/1, /*field_name=*/
"f1", FieldType::INT, Literal(10));
ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::Or({f0_predicate, f1_predicate}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Bob", 10, 1, 12.1],
[0, "Emily", 10, 0, 13.1],
[0, "Tony", 10, 0, 14.1],
[0, "Bob", 10, 1, 16.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
TEST_P(ReadInteWithIndexTest, TestWithBitmapAndBsiAndBloomFilterIndex) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bsi_bitmap_bloomfilter.db/append_with_bsi_bitmap_bloomfilter/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-d77394c5-8f82-4fca-93ca-d4da892e7f4f-0.orc";
} else if (file_format == "parquet") {
file_name = "data-15f89029-c10f-437f-b819-338c3e8ab2ee-0.parquet";
}
std::vector<DataField> read_fields = {
SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64())),
DataField(4, arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/932,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
CheckResultForBsi(path, arrow_data_type, split);
{
// test equal predicate for f3, only bloom filter take effective
auto predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(14.1));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test equal predicate for f3, only bloom filter take effective
auto predicate = PredicateBuilder::Equal(/*field_index=*/3, /*field_name=*/"f3",
FieldType::DOUBLE, Literal(14.13));
CheckResult(path, {split}, predicate, /*expected_array=*/nullptr);
}
{
// test equal predicate for f0, bitmap index takes effective
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
TEST_P(ReadInteWithIndexTest, TestWithIndexWithoutRegistered) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bsi_bitmap_bloomfilter.db/append_with_bsi_bitmap_bloomfilter/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-d77394c5-8f82-4fca-93ca-d4da892e7f4f-0.orc";
} else if (file_format == "parquet") {
file_name = "data-15f89029-c10f-437f-b819-338c3e8ab2ee-0.parquet";
}
auto factory_creator = FactoryCreator::GetInstance();
std::vector<DataField> read_fields = {
SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64())),
DataField(4, arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
// index file contains bitmap & bsi & bloomfilter
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/932,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
{
// only bitmap is registered
factory_creator->TEST_Unregister("bsi");
factory_creator->TEST_Unregister("bloom-filter");
ScopeGuard guard([&factory_creator]() {
factory_creator->Register("bsi", (new BitSliceIndexBitmapFileIndexFactory));
factory_creator->Register("bloom-filter", (new BloomFilterFileIndexFactory));
});
{
// test greater than predicate for f1, as only bitmap is registered, predicate takes
// no effective
auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, /*field_name=*/
"f1", FieldType::INT, Literal(100));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate, only f1_equals takes effective
auto f1_equals = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
auto f2_greater_than = PredicateBuilder::GreaterThan(
/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(0));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f1_equals, f2_greater_than}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 100, 2, 16.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
{
// only bsi is registered
factory_creator->TEST_Unregister("bitmap");
factory_creator->TEST_Unregister("bloom-filter");
ScopeGuard guard([&factory_creator]() {
factory_creator->Register("bitmap", (new BitmapFileIndexFactory));
factory_creator->Register("bloom-filter", (new BloomFilterFileIndexFactory));
});
{
// test equal predicate for f0, as bitmap is unregistered, predicate takes no
// effective
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 100, -2, 11.1, 1745542802000123000],
[0, "Bob", 200, -3, 12.1, 1745542902000123000],
[0, "Emily", 300, 1, 13.1, 1745542602000123000],
[0, "Tony", 50, 1, 14.1, -1744877000],
[0, "Lucy", 500, -1, 15.1, -1764877000],
[0, "Bob", 100, 2, 16.1, null],
[0, "Tony", null, -2, 17.1, 1745542802000123001],
[0, "Alice", 20, null, 18.1, -1724877000]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
{
// test and predicate, as bsi is registered f1_equals and f2_greater_than all take
// effective
auto f1_equals = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1",
FieldType::INT, Literal(100));
auto f2_greater_than = PredicateBuilder::GreaterThan(
/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, Literal(0));
ASSERT_OK_AND_ASSIGN(auto predicate,
PredicateBuilder::And({f1_equals, f2_greater_than}));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status =
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Bob", 100, 2, 16.1, null]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
CheckResult(path, {split}, predicate, expected_array);
}
}
}
TEST_P(ReadInteWithIndexTest, TestWithIOException) {
auto [file_format, enable_prefetch] = GetParam();
std::string path = GetDataDir() + "/" + file_format +
"/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/";
std::string file_name;
if (file_format == "orc") {
file_name = "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc";
} else if (file_format == "parquet") {
file_name = "data-783929b2-49d4-4006-a898-194a62e3278d-0.parquet";
}
std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
DataField(0, arrow::field("f0", arrow::utf8())),
DataField(1, arrow::field("f1", arrow::int32())),
DataField(2, arrow::field("f2", arrow::int32())),
DataField(3, arrow::field("f3", arrow::float64()))};
std::shared_ptr<arrow::DataType> arrow_data_type =
DataField::ConvertDataFieldsToArrowStructType(read_fields);
auto data_file_meta = std::make_shared<DataFileMeta>(
file_name, /*file_size=*/689,
/*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(),
/*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/SimpleStats::EmptyStats(), /*min_sequence_number=*/0,
/*max_sequence_number=*/7, /*schema_id=*/0,
/*level=*/0,
/*extra_files=*/
std::vector<std::optional<std::string>>({file_name + ".index"}),
/*creation_time=*/Timestamp(0ll, 0), /*delete_row_count=*/0,
/*embedded_index=*/nullptr, FileSource::Append(),
/*value_stats_cols=*/std::nullopt,
/*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt);
DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0,
/*bucket_path=*/path + "bucket-0/", {data_file_meta});
ASSERT_OK_AND_ASSIGN(auto split,
builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build());
auto predicate =
PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING,
Literal(FieldType::STRING, "Alice", 5));
std::shared_ptr<arrow::ChunkedArray> expected_array;
auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([
[0, "Alice", 10, 1, 11.1],
[0, "Alice", 20, null, 18.1]
])"},
&expected_array);
ASSERT_TRUE(array_status.ok());
bool run_complete = false;
auto io_hook = IOHook::GetInstance();
for (size_t i = 0; i < 200; i++) {
ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
io_hook->Reset(i, IOHook::Mode::RETURN_ERROR);
ReadContextBuilder context_builder(path);
context_builder.AddOption("read.batch-size", "2")
.AddOption("test.enable-adaptive-prefetch-strategy", "false")
.SetPredicate(predicate);
if (enable_prefetch) {
context_builder.EnablePrefetch(true).SetPrefetchBatchCount(3);
}
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
Result<std::unique_ptr<TableRead>> table_read = TableRead::Create(std::move(read_context));
CHECK_HOOK_STATUS(table_read.status(), i);
Result<std::unique_ptr<BatchReader>> batch_reader = table_read.value()->CreateReader(split);
CHECK_HOOK_STATUS(batch_reader.status(), i);
auto result = ReadResultCollector::CollectResult(batch_reader.value().get());
CHECK_HOOK_STATUS(result.status(), i);
auto result_array = result.value();
ASSERT_TRUE(result_array);
ASSERT_TRUE(result_array->Equals(*expected_array));
run_complete = true;
break;
}
ASSERT_TRUE(run_complete);
}
} // namespace paimon::test