blob: 6dcd564bf91df109635f006614fd8945afdbe61e [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/data/delete_loader.h"
#include <arrow/array.h>
#include <arrow/c/bridge.h>
#include <arrow/json/from_string.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/data/equality_delete_writer.h"
#include "iceberg/data/position_delete_writer.h"
#include "iceberg/deletes/position_delete_index.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/parquet/parquet_register.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/schema_internal.h"
#include "iceberg/test/matchers.h"
#include "iceberg/type.h"
#include "iceberg/util/struct_like_set.h"
namespace iceberg {
class DeleteLoaderTest : public ::testing::Test {
protected:
static void SetUpTestSuite() { parquet::RegisterAll(); }
void SetUp() override {
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
schema_ = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "name", string())});
partition_spec_ = PartitionSpec::Unpartitioned();
loader_ = std::make_unique<DeleteLoader>(file_io_);
}
/// Write position deletes using WriteDelete API and return the DataFile metadata.
std::shared_ptr<DataFile> WritePositionDeletes(
const std::string& path,
const std::vector<std::pair<std::string, int64_t>>& deletes) {
PositionDeleteWriterOptions options{
.path = path,
.schema = schema_,
.spec = partition_spec_,
.partition = PartitionValues{},
.format = FileFormatType::kParquet,
.io = file_io_,
.flush_threshold = 10000,
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
};
auto writer = PositionDeleteWriter::Make(options).value();
for (const auto& [file_path, pos] : deletes) {
ICEBERG_THROW_NOT_OK(writer->WriteDelete(file_path, pos));
}
ICEBERG_THROW_NOT_OK(writer->Close());
return writer->Metadata().value().data_files[0];
}
/// Write equality deletes and return the DataFile metadata.
std::shared_ptr<DataFile> WriteEqualityDeletes(
const std::string& path, const std::string& json_data,
std::vector<int32_t> equality_field_ids = {1, 2}) {
EqualityDeleteWriterOptions options{
.path = path,
.schema = schema_,
.spec = partition_spec_,
.partition = PartitionValues{},
.format = FileFormatType::kParquet,
.io = file_io_,
.equality_field_ids = std::move(equality_field_ids),
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
};
auto writer = EqualityDeleteWriter::Make(options).value();
ArrowSchema arrow_c_schema;
ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema));
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
auto test_data = ::arrow::json::ArrayFromJSONString(
::arrow::struct_(arrow_type->fields()), json_data)
.ValueOrDie();
ArrowArray arrow_array;
auto export_status = ::arrow::ExportArray(*test_data, &arrow_array);
if (!export_status.ok()) throw std::runtime_error(export_status.ToString());
ICEBERG_THROW_NOT_OK(writer->Write(&arrow_array));
ICEBERG_THROW_NOT_OK(writer->Close());
return writer->Metadata().value().data_files[0];
}
std::shared_ptr<FileIO> file_io_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<PartitionSpec> partition_spec_;
std::unique_ptr<DeleteLoader> loader_;
};
// --- Position Delete Tests ---
TEST_F(DeleteLoaderTest, LoadPositionDeletesEmpty) {
std::vector<std::shared_ptr<DataFile>> empty;
auto result = loader_->LoadPositionDeletes(empty, "data.parquet");
ASSERT_THAT(result, IsOk());
ASSERT_TRUE(result->IsEmpty());
}
TEST_F(DeleteLoaderTest, LoadPositionDeletesSingleFile) {
auto delete_file = WritePositionDeletes(
"pos_deletes_1.parquet",
{{"data.parquet", 0}, {"data.parquet", 5}, {"data.parquet", 10}});
std::vector<std::shared_ptr<DataFile>> files = {delete_file};
auto result = loader_->LoadPositionDeletes(files, "data.parquet");
ASSERT_THAT(result, IsOk());
auto& index = result.value();
ASSERT_FALSE(index.IsEmpty());
ASSERT_EQ(index.Cardinality(), 3);
ASSERT_TRUE(index.IsDeleted(0));
ASSERT_TRUE(index.IsDeleted(5));
ASSERT_TRUE(index.IsDeleted(10));
ASSERT_FALSE(index.IsDeleted(1));
ASSERT_FALSE(index.IsDeleted(6));
}
TEST_F(DeleteLoaderTest, LoadPositionDeletesMultipleFiles) {
auto file1 = WritePositionDeletes("pos_deletes_a.parquet",
{{"data.parquet", 1}, {"data.parquet", 3}});
auto file2 = WritePositionDeletes("pos_deletes_b.parquet",
{{"data.parquet", 5}, {"data.parquet", 7}});
std::vector<std::shared_ptr<DataFile>> files = {file1, file2};
auto result = loader_->LoadPositionDeletes(files, "data.parquet");
ASSERT_THAT(result, IsOk());
auto& index = result.value();
ASSERT_EQ(index.Cardinality(), 4);
ASSERT_TRUE(index.IsDeleted(1));
ASSERT_TRUE(index.IsDeleted(3));
ASSERT_TRUE(index.IsDeleted(5));
ASSERT_TRUE(index.IsDeleted(7));
}
TEST_F(DeleteLoaderTest, LoadPositionDeletesFiltersByDataFilePath) {
auto delete_file =
WritePositionDeletes("pos_deletes_mixed.parquet", {{"data_a.parquet", 0},
{"data_a.parquet", 2},
{"data_b.parquet", 10},
{"data_b.parquet", 20}});
std::vector<std::shared_ptr<DataFile>> files = {delete_file};
// Load only positions for data_a.parquet
auto result_a = loader_->LoadPositionDeletes(files, "data_a.parquet");
ASSERT_THAT(result_a, IsOk());
ASSERT_EQ(result_a.value().Cardinality(), 2);
ASSERT_TRUE(result_a.value().IsDeleted(0));
ASSERT_TRUE(result_a.value().IsDeleted(2));
ASSERT_FALSE(result_a.value().IsDeleted(10));
// Load only positions for data_b.parquet
auto result_b = loader_->LoadPositionDeletes(files, "data_b.parquet");
ASSERT_THAT(result_b, IsOk());
ASSERT_EQ(result_b.value().Cardinality(), 2);
ASSERT_TRUE(result_b.value().IsDeleted(10));
ASSERT_TRUE(result_b.value().IsDeleted(20));
ASSERT_FALSE(result_b.value().IsDeleted(0));
// Load for non-existent file
auto result_none = loader_->LoadPositionDeletes(files, "nonexistent.parquet");
ASSERT_THAT(result_none, IsOk());
ASSERT_TRUE(result_none.value().IsEmpty());
}
TEST_F(DeleteLoaderTest, LoadPositionDeletesSkipsMismatchedReferencedDataFile) {
auto delete_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kPositionDeletes,
.file_path = "missing-pos-delete.parquet",
.file_format = FileFormatType::kParquet,
.referenced_data_file = "other-data.parquet",
});
std::vector<std::shared_ptr<DataFile>> files = {delete_file};
auto result = loader_->LoadPositionDeletes(files, "data.parquet");
ASSERT_THAT(result, IsOk());
ASSERT_TRUE(result.value().IsEmpty());
}
TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsDV) {
auto dv_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kPositionDeletes,
.file_path = "dv.puffin",
.file_format = FileFormatType::kPuffin,
});
std::vector<std::shared_ptr<DataFile>> files = {dv_file};
auto result = loader_->LoadPositionDeletes(files, "data.parquet");
ASSERT_THAT(result, IsError(ErrorKind::kNotSupported));
}
TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsWrongContent) {
auto data_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kData,
.file_path = "data.parquet",
.file_format = FileFormatType::kParquet,
});
std::vector<std::shared_ptr<DataFile>> files = {data_file};
auto result = loader_->LoadPositionDeletes(files, "data.parquet");
ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument));
}
// --- Equality Delete Tests ---
TEST_F(DeleteLoaderTest, LoadEqualityDeletesEmpty) {
std::vector<std::shared_ptr<DataFile>> empty;
StructType eq_type(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "name", string()),
});
auto result = loader_->LoadEqualityDeletes(empty, eq_type);
ASSERT_THAT(result, IsOk());
ASSERT_TRUE(result.value()->IsEmpty());
ASSERT_EQ(result.value()->Size(), 0);
}
TEST_F(DeleteLoaderTest, LoadEqualityDeletesSingleFile) {
auto delete_file =
WriteEqualityDeletes("eq_deletes_1.parquet", R"([[1, "Alice"], [2, "Bob"]])");
StructType eq_type(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "name", string()),
});
std::vector<std::shared_ptr<DataFile>> files = {delete_file};
auto result = loader_->LoadEqualityDeletes(files, eq_type);
ASSERT_THAT(result, IsOk());
ASSERT_FALSE(result.value()->IsEmpty());
ASSERT_EQ(result.value()->Size(), 2);
}
TEST_F(DeleteLoaderTest, LoadEqualityDeletesMultipleFiles) {
auto file1 = WriteEqualityDeletes("eq_deletes_a.parquet", R"([[1, "Alice"]])");
auto file2 =
WriteEqualityDeletes("eq_deletes_b.parquet", R"([[2, "Bob"], [3, "Charlie"]])");
StructType eq_type(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
SchemaField::MakeOptional(2, "name", string()),
});
std::vector<std::shared_ptr<DataFile>> files = {file1, file2};
auto result = loader_->LoadEqualityDeletes(files, eq_type);
ASSERT_THAT(result, IsOk());
ASSERT_EQ(result.value()->Size(), 3);
}
TEST_F(DeleteLoaderTest, LoadEqualityDeletesRejectsWrongContent) {
auto data_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kData,
.file_path = "data.parquet",
.file_format = FileFormatType::kParquet,
});
StructType eq_type(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", int32()),
});
std::vector<std::shared_ptr<DataFile>> files = {data_file};
auto result = loader_->LoadEqualityDeletes(files, eq_type);
ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument));
}
} // namespace iceberg