blob: 6c77fad160c8bfbaa655111b500461484723df64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/update/fast_append.h"
#include <format>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "iceberg/avro/avro_register.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/test_resource.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/util/uuid.h"
namespace iceberg {
class FastAppendTest : public UpdateTestBase {
protected:
static void SetUpTestSuite() { avro::RegisterAll(); }
std::string MetadataResource() const override {
return "TableMetadataV2ValidMinimal.json";
}
void SetUp() override {
UpdateTestBase::SetUp();
// Get partition spec and schema from the base table
ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
// Create test data files
file_a_ =
CreateDataFile("/data/file_a.parquet", /*size=*/100, /*partition_value=*/1024);
file_b_ =
CreateDataFile("/data/file_b.parquet", /*size=*/200, /*partition_value=*/2048);
}
std::shared_ptr<DataFile> CreateDataFile(const std::string& path, int64_t record_count,
int64_t size, int64_t partition_value = 0) {
auto data_file = std::make_shared<DataFile>();
data_file->content = DataFile::Content::kData;
data_file->file_path = table_location_ + path;
data_file->file_format = FileFormatType::kParquet;
// The base table has partition spec with identity(x), so we need 1 partition value
data_file->partition =
PartitionValues(std::vector<Literal>{Literal::Long(partition_value)});
data_file->file_size_in_bytes = size;
data_file->record_count = record_count;
data_file->partition_spec_id = spec_->spec_id();
return data_file;
}
std::shared_ptr<PartitionSpec> spec_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<DataFile> file_a_;
std::shared_ptr<DataFile> file_b_;
};
TEST_F(FastAppendTest, AppendDataFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024");
}
TEST_F(FastAppendTest, AppendMultipleDataFiles) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
fast_append->AppendFile(file_b_);
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("added-data-files"), "2");
EXPECT_EQ(snapshot->summary.at("added-records"), "300");
EXPECT_EQ(snapshot->summary.at("added-files-size"), "3072");
}
TEST_F(FastAppendTest, AppendManyFiles) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
int64_t total_records = 0;
int64_t total_size = 0;
constexpr int kFileCount = 10;
for (int index = 0; index < kFileCount; ++index) {
auto data_file = CreateDataFile(std::format("/data/file_{}.parquet", index),
/*record_count=*/10 + index,
/*size=*/100 + index * 10,
/*partition_value=*/index % 2);
total_records += data_file->record_count;
total_size += data_file->file_size_in_bytes;
fast_append->AppendFile(std::move(data_file));
}
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("added-data-files"), std::to_string(kFileCount));
EXPECT_EQ(snapshot->summary.at("added-records"), std::to_string(total_records));
EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size));
}
TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) {
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
const int64_t base_sequence_number = table_->metadata()->last_sequence_number;
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->sequence_number, base_sequence_number + 1);
EXPECT_EQ(table_->metadata()->last_sequence_number, base_sequence_number + 1);
}
TEST_F(FastAppendTest, AppendNullFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(nullptr);
auto result = fast_append->Commit();
EXPECT_FALSE(result.has_value());
EXPECT_THAT(result, HasErrorMessage("Invalid data file: null"));
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
}
TEST_F(FastAppendTest, AppendDuplicateFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
fast_append->AppendFile(file_a_); // Add same file twice
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
// Should only count the file once
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
}
TEST_F(FastAppendTest, SetSnapshotProperty) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->Set("custom-property", "custom-value");
fast_append->AppendFile(file_a_);
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value");
}
} // namespace iceberg