blob: 559aedd8143acb70f7679ee2104f4eb89e9297c1 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "iceberg/test/scan_test_base.h"
namespace iceberg {
class IncrementalAppendScanTest : public ScanTestBase {};
TEST_P(IncrementalAppendScanTest, FromSnapshotInclusive) {
auto version = GetParam();
// Create 3 snapshots, each appending one file
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b =
MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"});
auto snapshot_c =
MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_c.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_c}, 3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}});
// Test: from_snapshot_inclusive(snapshot_a) should return 3 files (A, B, C)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(1000L, /*inclusive=*/true);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 3);
EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre(
"/path/to/file_a.parquet", "/path/to/file_b.parquet",
"/path/to/file_c.parquet"));
}
// Test: from_snapshot_inclusive(snapshot_a).to_snapshot(snapshot_c) should return 3
// files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot(3000L);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 3);
}
}
TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithNonExistingRef) {
auto metadata = MakeTableMetadata({}, -1L);
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("non_existing_ref", /*inclusive=*/true);
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Cannot find ref: non_existing_ref")));
}
TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithTag) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b = MakeAppendSnapshot(
version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"});
auto snapshot_current = MakeAppendSnapshot(
version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", "/path/to/file_a2.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_current}, 3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})},
{"t1", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})},
{"t2", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})}});
// Test: from_snapshot_inclusive(t1) should return 5 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/true);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 5);
}
// Test: from_snapshot_inclusive(t1).to_snapshot(t2) should return 3 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/true).ToSnapshot("t2");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 3);
}
}
TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithBranchShouldFail) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a}, 1000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})},
{"b1", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}});
// Test: from_snapshot_inclusive(branch_name) should fail
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("b1", /*inclusive=*/true);
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Ref b1 is not a tag")));
}
// Test: to_snapshot(branch_name) should fail
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot("b1");
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Ref b1 is not a tag")));
}
}
TEST_P(IncrementalAppendScanTest, UseBranch) {
auto version = GetParam();
// Common ancestor
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
// Main branch snapshots
auto snapshot_main_b = MakeAppendSnapshot(
version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"});
auto snapshot_current = MakeAppendSnapshot(
version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", "/path/to/file_a2.parquet"});
// Branch b1 snapshots
auto snapshot_branch_b =
MakeAppendSnapshot(version, 4000L, 1000L, 2L, {"/path/to/file_c_branch.parquet"});
auto snapshot_branch_c =
MakeAppendSnapshot(version, 5000L, 4000L, 3L, {"/path/to/file_c_branch2.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_main_b, snapshot_current, snapshot_branch_b,
snapshot_branch_c},
3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})},
{"t1", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})},
{"t2", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})},
{"b1", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 5000L, .retention = SnapshotRef::Branch{}})}});
// Test: from_snapshot_inclusive(t1) on main should return 5 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/true);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 5);
}
// Test: from_snapshot_inclusive(t1).use_branch(b1) should return 3 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/true).UseBranch("b1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 3);
}
// Test: to_snapshot(snapshot_branch_b).use_branch(b1) should return 2 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot(4000L).UseBranch("b1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
}
// Test: to_snapshot(snapshot_branch_c).use_branch(b1) should return 3 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot(5000L).UseBranch("b1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 3);
}
// Test: from_snapshot_exclusive(t1).to_snapshot(snapshot_branch_b).use_branch(b1)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/false).ToSnapshot(4000L).UseBranch("b1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 1);
}
}
TEST_P(IncrementalAppendScanTest, UseBranchWithTagShouldFail) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a}, 1000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})},
{"t1", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})}});
// Test: use_branch(tag_name) should fail
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(1000L, /*inclusive=*/true).UseBranch("t1");
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Ref t1 is not a branch")));
}
TEST_P(IncrementalAppendScanTest, UseBranchWithInvalidSnapshotShouldFail) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_main_b = MakeAppendSnapshot(
version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"});
auto snapshot_branch_b =
MakeAppendSnapshot(version, 3000L, 1000L, 2L, {"/path/to/file_d.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_main_b, snapshot_branch_b}, 2000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})},
{"b1", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}});
// Test: to_snapshot(snapshot_main_b).use_branch(b1) should fail
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot(2000L).UseBranch("b1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
EXPECT_THAT(
scan->PlanFiles(),
::testing::AllOf(
IsError(ErrorKind::kValidationFailed),
HasErrorMessage(
"End snapshot is not a valid snapshot on the current branch: b1")));
}
// Test: from_snapshot_inclusive(snapshot_main_b).use_branch(b1) should fail
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(2000L, /*inclusive=*/true).UseBranch("b1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
EXPECT_THAT(
scan->PlanFiles(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Starting snapshot (inclusive) 2000 is not an "
"ancestor of end snapshot 3000")));
}
}
TEST_P(IncrementalAppendScanTest, UseBranchWithNonExistingRef) {
auto metadata = MakeTableMetadata({}, -1L);
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->UseBranch("non_existing_ref");
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Cannot find ref: non_existing_ref")));
}
TEST_P(IncrementalAppendScanTest, FromSnapshotExclusive) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b =
MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"});
auto snapshot_c =
MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_c.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_c}, 3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}});
// Test: from_snapshot_exclusive(snapshot_a) should return 2 files (B, C)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(1000L, /*inclusive=*/false);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/file_b.parquet",
"/path/to/file_c.parquet"));
}
// Test: from_snapshot_exclusive(snapshot_a).to_snapshot(snapshot_b) should return 1
// file (B)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(1000L, /*inclusive=*/false).ToSnapshot(2000L);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 1);
EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/file_b.parquet");
}
}
TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithNonExistingRef) {
auto metadata = MakeTableMetadata({}, -1L);
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("nonExistingRef", /*inclusive=*/false);
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Cannot find ref: nonExistingRef")));
}
TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithTag) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b = MakeAppendSnapshot(
version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", "/path/to/file_c.parquet"});
auto snapshot_current = MakeAppendSnapshot(
version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", "/path/to/file_a2.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_current}, 3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})},
{"t1", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 1000L, .retention = SnapshotRef::Tag{}})},
{"t2", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})}});
// Test: from_snapshot_exclusive(t1) should return 4 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/false);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 4);
}
// Test: from_snapshot_exclusive(t1).to_snapshot(t2) should return 2 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("t1", /*inclusive=*/false).ToSnapshot("t2");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
}
}
TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithBranchShouldFail) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a}, 1000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})},
{"b1", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L, .retention = SnapshotRef::Branch{}})}});
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot("b1", /*inclusive=*/false);
EXPECT_THAT(builder->Build(), ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Ref b1 is not a tag")));
}
TEST_P(IncrementalAppendScanTest, ToSnapshot) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b =
MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"});
auto snapshot_c =
MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_c.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_c}, 3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})}});
// Test: to_snapshot(snapshot_b) should return 2 files (A, B)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot(2000L);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/file_a.parquet",
"/path/to/file_b.parquet"));
}
}
TEST_P(IncrementalAppendScanTest, ToSnapshotWithTag) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b =
MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"});
auto snapshot_current =
MakeAppendSnapshot(version, 3000L, 2000L, 3L, {"/path/to/file_b2.parquet"});
auto snapshot_branch_b =
MakeAppendSnapshot(version, 4000L, 2000L, 3L, {"/path/to/file_c.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_current, snapshot_branch_b}, 3000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 3000L, .retention = SnapshotRef::Branch{}})},
{"b1", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 4000L, .retention = SnapshotRef::Branch{}})},
{"t1", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 2000L, .retention = SnapshotRef::Tag{}})},
{"t2", std::make_shared<SnapshotRef>(
SnapshotRef{.snapshot_id = 4000L, .retention = SnapshotRef::Tag{}})}});
// Test: to_snapshot(t1) should return 2 files
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot("t1");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
}
// Test: to_snapshot(t2) should return 3 files (on branch b1)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot("t2");
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 3);
}
}
TEST_P(IncrementalAppendScanTest, ToSnapshotWithNonExistingRef) {
auto metadata = MakeTableMetadata({}, -1L);
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot("non_existing_ref");
EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Cannot find ref: non_existing_ref")));
}
TEST_P(IncrementalAppendScanTest, ToSnapshotWithBranchShouldFail) {
auto version = GetParam();
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
auto snapshot_b =
MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"});
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b}, 2000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})},
{"b1", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})}});
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot("b1");
EXPECT_THAT(builder->Build(), ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Ref b1 is not a tag")));
}
TEST_P(IncrementalAppendScanTest, MultipleRootSnapshots) {
auto version = GetParam();
// Snapshot A (will be "expired" by not having it as parent of C)
auto snapshot_a =
MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, {"/path/to/file_a.parquet"});
// Snapshot B (staged, orphaned - not an ancestor of main branch)
auto snapshot_b =
MakeAppendSnapshot(version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet"});
// Snapshot C (new root after A is expired)
auto snapshot_c =
MakeAppendSnapshot(version, 3000L, std::nullopt, 3L, {"/path/to/file_c.parquet"});
// Snapshot D
auto snapshot_d =
MakeAppendSnapshot(version, 4000L, 3000L, 4L, {"/path/to/file_d.parquet"});
// Note: snapshot_a is kept in metadata but not in the parent chain of C/D
// This simulates expiring snapshot A, creating two root snapshots (B and C)
auto metadata = MakeTableMetadata(
{snapshot_a, snapshot_b, snapshot_c, snapshot_d}, 4000L,
{{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 4000L, .retention = SnapshotRef::Branch{}})}});
// Test: to_snapshot(snapshot_d) should discover snapshots C and D only
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->ToSnapshot(4000L);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks),
testing::UnorderedElementsAre("/path/to/file_c.parquet",
"/path/to/file_d.parquet"));
}
// Test: from_snapshot_exclusive(snapshot_b).to_snapshot(snapshot_d) should fail
// because B is not a parent ancestor of D
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(2000L, /*inclusive=*/false).ToSnapshot(4000L);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
EXPECT_THAT(
scan->PlanFiles(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Starting snapshot (exclusive) 2000 is not a "
"parent ancestor of end snapshot 4000")));
}
// Test: from_snapshot_inclusive(snapshot_b).to_snapshot(snapshot_d) should fail
// because B is not an ancestor of D
{
ICEBERG_UNWRAP_OR_FAIL(auto builder,
IncrementalAppendScanBuilder::Make(metadata, file_io_));
builder->FromSnapshot(2000L, /*inclusive=*/true).ToSnapshot(4000L);
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
EXPECT_THAT(
scan->PlanFiles(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
HasErrorMessage("Starting snapshot (inclusive) 2000 is not an "
"ancestor of end snapshot 4000")));
}
}
INSTANTIATE_TEST_SUITE_P(IncrementalAppendScanVersions, IncrementalAppendScanTest,
testing::Values(1, 2, 3));
} // namespace iceberg