blob: d9fc1e9276ecd0ac44f4e4415a8574fe81efb936 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/table_scan.h"
#include <chrono>
#include <format>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/expression/expressions.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"
#include "iceberg/util/timepoint.h"
namespace iceberg {
class TableScanTest : public testing::TestWithParam<int8_t> {
protected:
void SetUp() override {
avro::RegisterAll();
file_io_ = arrow::MakeMockFileIO();
schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
unpartitioned_spec_ = PartitionSpec::Unpartitioned();
ICEBERG_UNWRAP_OR_FAIL(
partitioned_spec_,
PartitionSpec::Make(
/*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
"data_bucket_16_2", Transform::Bucket(16))}));
MakeTableMetadata();
}
void MakeTableMetadata() {
constexpr int64_t kSnapshotId = 1000L;
constexpr int64_t kSequenceNumber = 1L;
const TimePointMs kTimestampMs =
TimePointMsFromUnixMs(1609459200000L); // 2021-01-01 00:00:00 UTC
auto snapshot = std::make_shared<Snapshot>(
Snapshot{.snapshot_id = kSnapshotId,
.parent_snapshot_id = std::nullopt,
.sequence_number = kSequenceNumber,
.timestamp_ms = kTimestampMs,
.manifest_list = "/tmp/metadata/snap-1000-1-manifest-list.avro",
.schema_id = schema_->schema_id()});
table_metadata_ = std::make_shared<TableMetadata>(
TableMetadata{.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = kSequenceNumber,
.last_updated_ms = kTimestampMs,
.last_column_id = 2,
.schemas = {schema_},
.current_schema_id = schema_->schema_id(),
.partition_specs = {partitioned_spec_, unpartitioned_spec_},
.default_spec_id = partitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = kSnapshotId,
.snapshots = {snapshot},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = kTimestampMs,
.snapshot_id = kSnapshotId}},
.default_sort_order_id = 0,
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = kSnapshotId,
.retention = SnapshotRef::Branch{}})}}});
}
std::shared_ptr<DataFile> MakePositionDeleteFile(
const std::string& path, const PartitionValues& partition, int32_t spec_id,
std::optional<std::string> referenced_file = std::nullopt) {
return std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kPositionDeletes,
.file_path = path,
.file_format = FileFormatType::kParquet,
.partition = partition,
.record_count = 1,
.file_size_in_bytes = 10,
.referenced_data_file = referenced_file,
.partition_spec_id = spec_id,
});
}
std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
const PartitionValues& partition,
int32_t spec_id,
std::vector<int> equality_ids = {1}) {
return std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kEqualityDeletes,
.file_path = path,
.file_format = FileFormatType::kParquet,
.partition = partition,
.record_count = 1,
.file_size_in_bytes = 10,
.equality_ids = std::move(equality_ids),
.partition_spec_id = spec_id,
});
}
std::string MakeManifestPath() {
static int counter = 0;
return std::format("manifest-{}-{}.avro", counter++,
std::chrono::system_clock::now().time_since_epoch().count());
}
std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
const PartitionValues& partition,
int32_t spec_id, int64_t record_count = 1,
std::optional<int32_t> lower_id = std::nullopt,
std::optional<int32_t> upper_id = std::nullopt) {
auto file = std::make_shared<DataFile>(DataFile{
.file_path = path,
.file_format = FileFormatType::kParquet,
.partition = partition,
.record_count = record_count,
.file_size_in_bytes = 10,
.sort_order_id = 0,
.partition_spec_id = spec_id,
});
// Set lower/upper bounds for field_id=1 ("id" column) if provided
if (lower_id.has_value()) {
file->lower_bounds[1] = Literal::Int(lower_id.value()).Serialize().value();
}
if (upper_id.has_value()) {
file->upper_bounds[1] = Literal::Int(upper_id.value()).Serialize().value();
}
return file;
}
ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
int64_t sequence_number, std::shared_ptr<DataFile> file) {
return ManifestEntry{
.status = status,
.snapshot_id = snapshot_id,
.sequence_number = sequence_number,
.file_sequence_number = sequence_number,
.data_file = std::move(file),
};
}
ManifestFile WriteDataManifest(int8_t format_version, int64_t snapshot_id,
std::vector<ManifestEntry> entries,
std::shared_ptr<PartitionSpec> spec) {
const std::string manifest_path = MakeManifestPath();
auto writer_result = ManifestWriter::MakeWriter(
format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
ManifestContent::kData,
/*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : std::nullopt);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
for (const auto& entry : entries) {
EXPECT_THAT(writer->WriteEntry(entry), IsOk());
}
EXPECT_THAT(writer->Close(), IsOk());
auto manifest_result = writer->ToManifestFile();
EXPECT_THAT(manifest_result, IsOk());
return std::move(manifest_result.value());
}
ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id,
std::vector<ManifestEntry> entries,
std::shared_ptr<PartitionSpec> spec) {
const std::string manifest_path = MakeManifestPath();
auto writer_result =
ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, file_io_,
spec, schema_, ManifestContent::kDeletes);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
for (const auto& entry : entries) {
EXPECT_THAT(writer->WriteEntry(entry), IsOk());
}
EXPECT_THAT(writer->Close(), IsOk());
auto manifest_result = writer->ToManifestFile();
EXPECT_THAT(manifest_result, IsOk());
return std::move(manifest_result.value());
}
std::string MakeManifestListPath() {
static int counter = 0;
return std::format("manifest-list-{}-{}.avro", counter++,
std::chrono::system_clock::now().time_since_epoch().count());
}
std::string WriteManifestList(int8_t format_version, int64_t snapshot_id,
int64_t sequence_number,
const std::vector<ManifestFile>& manifests) {
const std::string manifest_list_path = MakeManifestListPath();
constexpr int64_t kParentSnapshotId = 0L;
auto writer_result = ManifestListWriter::MakeWriter(
format_version, snapshot_id, kParentSnapshotId, manifest_list_path, file_io_,
/*sequence_number=*/format_version >= 2 ? std::optional(sequence_number)
: std::nullopt,
/*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : std::nullopt);
EXPECT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());
EXPECT_THAT(writer->AddAll(manifests), IsOk());
EXPECT_THAT(writer->Close(), IsOk());
return manifest_list_path;
}
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
return {{partitioned_spec_->spec_id(), partitioned_spec_},
{unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
}
static std::vector<std::string> GetPaths(
const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
return tasks | std::views::transform([](const auto& task) {
return task->data_file()->file_path;
}) |
std::ranges::to<std::vector<std::string>>();
}
std::shared_ptr<FileIO> file_io_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<PartitionSpec> partitioned_spec_;
std::shared_ptr<PartitionSpec> unpartitioned_spec_;
std::shared_ptr<TableMetadata> table_metadata_;
};
TEST_P(TableScanTest, TableScanBuilderOptions) {
// Test basic scan creation and default values
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(table_metadata_, file_io_));
ICEBERG_UNWRAP_OR_FAIL(auto basic_scan, builder->Build());
EXPECT_NE(basic_scan, nullptr);
EXPECT_EQ(basic_scan->metadata(), table_metadata_);
EXPECT_EQ(basic_scan->io(), file_io_);
EXPECT_TRUE(basic_scan->is_case_sensitive());
// Test all builder options with method chaining
auto projected_schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
auto filter = Expressions::Equal("id", Literal::Int(42));
constexpr int64_t kMinRows = 1000;
constexpr int64_t kSnapshotId = 1000L;
ICEBERG_UNWRAP_OR_FAIL(auto builder2,
DataTableScanBuilder::Make(table_metadata_, file_io_));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder2->Option("key1", "value1")
.Option("key2", "value2")
.CaseSensitive(false)
.Project(projected_schema)
.Filter(filter)
.IncludeColumnStats({"id", "data"})
.IgnoreResiduals()
.MinRowsRequested(kMinRows)
.UseSnapshot(kSnapshotId)
.Build());
// Verify all options were set correctly
ICEBERG_UNWRAP_OR_FAIL(auto schema, scan->schema());
EXPECT_EQ(schema, projected_schema);
EXPECT_EQ(scan->filter(), filter);
EXPECT_FALSE(scan->is_case_sensitive());
const auto& context = scan->context();
EXPECT_EQ(context.options.at("key1"), "value1");
EXPECT_EQ(context.options.at("key2"), "value2");
EXPECT_TRUE(context.return_column_stats);
EXPECT_EQ(context.columns_to_keep_stats.size(), 2);
EXPECT_TRUE(context.columns_to_keep_stats.contains(1)); // id field
EXPECT_TRUE(context.columns_to_keep_stats.contains(2)); // data field
EXPECT_TRUE(context.ignore_residuals);
EXPECT_TRUE(context.min_rows_requested.has_value());
EXPECT_EQ(context.min_rows_requested.value(), kMinRows);
EXPECT_TRUE(context.snapshot_id.has_value());
EXPECT_EQ(context.snapshot_id.value(), kSnapshotId);
// Test UseRef separately
ICEBERG_UNWRAP_OR_FAIL(auto builder3,
DataTableScanBuilder::Make(table_metadata_, file_io_));
builder3->UseRef("main");
ICEBERG_UNWRAP_OR_FAIL(auto ref_scan, builder3->Build());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, ref_scan->snapshot());
EXPECT_EQ(snapshot->snapshot_id, 1000L);
}
TEST_P(TableScanTest, TableScanBuilderValidationErrors) {
// Test negative min rows
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(table_metadata_, file_io_));
builder->MinRowsRequested(-1);
EXPECT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
// Test invalid snapshot ID
ICEBERG_UNWRAP_OR_FAIL(auto builder2,
DataTableScanBuilder::Make(table_metadata_, file_io_));
builder2->UseSnapshot(9999L);
EXPECT_THAT(builder2->Build(), IsError(ErrorKind::kValidationFailed));
// Test invalid ref
ICEBERG_UNWRAP_OR_FAIL(auto builder3,
DataTableScanBuilder::Make(table_metadata_, file_io_));
builder3->UseRef("non-existent-ref");
EXPECT_THAT(builder3->Build(), IsError(ErrorKind::kValidationFailed));
// Test null inputs
EXPECT_THAT(DataTableScanBuilder::Make(nullptr, file_io_),
IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(DataTableScanBuilder::Make(table_metadata_, nullptr),
IsError(ErrorKind::kInvalidArgument));
}
TEST_P(TableScanTest, DataTableScanPlanFilesEmpty) {
auto empty_metadata = std::make_shared<TableMetadata>(
TableMetadata{.format_version = 2,
.schemas = {schema_},
.current_schema_id = schema_->schema_id(),
.partition_specs = {unpartitioned_spec_},
.default_spec_id = unpartitioned_spec_->spec_id(),
.current_snapshot_id = -1,
.snapshots = {},
.refs = {}});
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(empty_metadata, file_io_));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
EXPECT_TRUE(tasks.empty());
}
TEST_P(TableScanTest, PlanFilesWithDataManifests) {
auto version = GetParam();
constexpr int64_t kSnapshotId = 1000L;
const auto part_value = PartitionValues({Literal::Int(0)});
std::vector<ManifestEntry> data_entries{
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
MakeDataFile("/path/to/data1.parquet", part_value,
partitioned_spec_->spec_id(), /*record_count=*/100)),
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
MakeDataFile("/path/to/data2.parquet", part_value,
partitioned_spec_->spec_id(), /*record_count=*/200))};
auto data_manifest =
WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_);
std::string manifest_list_path =
WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest});
// Create a snapshot that references this manifest list
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
auto snapshot_with_manifest =
std::make_shared<Snapshot>(Snapshot{.snapshot_id = kSnapshotId,
.parent_snapshot_id = std::nullopt,
.sequence_number = 1L,
.timestamp_ms = timestamp_ms,
.manifest_list = manifest_list_path,
.summary = {},
.schema_id = schema_->schema_id()});
auto metadata_with_manifest = std::make_shared<TableMetadata>(
TableMetadata{.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = 1L,
.last_updated_ms = timestamp_ms,
.last_column_id = 2,
.schemas = {schema_},
.current_schema_id = schema_->schema_id(),
.partition_specs = {partitioned_spec_, unpartitioned_spec_},
.default_spec_id = partitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = kSnapshotId,
.snapshots = {snapshot_with_manifest},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
.snapshot_id = kSnapshotId}},
.default_sort_order_id = 0,
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = kSnapshotId,
.retention = SnapshotRef::Branch{},
})}}});
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(metadata_with_manifest, file_io_));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet",
"/path/to/data2.parquet"));
}
TEST_P(TableScanTest, PlanFilesWithMultipleManifests) {
auto version = GetParam();
const auto partition_a = PartitionValues({Literal::Int(0)});
const auto partition_b = PartitionValues({Literal::Int(1)});
// Create first data manifest
std::vector<ManifestEntry> data_entries_1{MakeEntry(
ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1,
MakeDataFile("/path/to/data1.parquet", partition_a, partitioned_spec_->spec_id()))};
auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L,
std::move(data_entries_1), partitioned_spec_);
// Create second data manifest
std::vector<ManifestEntry> data_entries_2{MakeEntry(
ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1,
MakeDataFile("/path/to/data2.parquet", partition_b, partitioned_spec_->spec_id()))};
auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1000L,
std::move(data_entries_2), partitioned_spec_);
// Write manifest list with multiple manifests
std::string manifest_list_path =
WriteManifestList(version, /*snapshot_id=*/1000L, /*sequence_number=*/1,
{data_manifest_1, data_manifest_2});
// Create a snapshot that references this manifest list
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
auto snapshot_with_manifests =
std::make_shared<Snapshot>(Snapshot{.snapshot_id = 1000L,
.parent_snapshot_id = std::nullopt,
.sequence_number = 1L,
.timestamp_ms = timestamp_ms,
.manifest_list = manifest_list_path,
.summary = {},
.schema_id = schema_->schema_id()});
auto metadata_with_manifests = std::make_shared<TableMetadata>(
TableMetadata{.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = 1L,
.last_updated_ms = timestamp_ms,
.last_column_id = 2,
.schemas = {schema_},
.current_schema_id = schema_->schema_id(),
.partition_specs = {partitioned_spec_, unpartitioned_spec_},
.default_spec_id = partitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = 1000L,
.snapshots = {snapshot_with_manifests},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
.snapshot_id = 1000L}},
.default_sort_order_id = 0,
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L,
.retention = SnapshotRef::Branch{},
})}}});
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(metadata_with_manifests, file_io_));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet",
"/path/to/data2.parquet"));
}
TEST_P(TableScanTest, PlanFilesWithFilter) {
auto version = GetParam();
constexpr int64_t kSnapshotId = 1000L;
const auto part_value = PartitionValues({Literal::Int(0)});
// Create two data files with non-overlapping id ranges:
// - data1.parquet: id range [1, 50]
// - data2.parquet: id range [51, 100]
std::vector<ManifestEntry> data_entries{
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
MakeDataFile("/path/to/data1.parquet", part_value,
partitioned_spec_->spec_id(), /*record_count=*/1,
/*lower_id=*/1, /*upper_id=*/50)),
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
MakeDataFile("/path/to/data2.parquet", part_value,
partitioned_spec_->spec_id(), /*record_count=*/1,
/*lower_id=*/51, /*upper_id=*/100))};
auto data_manifest =
WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_);
std::string manifest_list_path =
WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest});
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
auto snapshot = std::make_shared<Snapshot>(Snapshot{.snapshot_id = kSnapshotId,
.parent_snapshot_id = std::nullopt,
.sequence_number = 1L,
.timestamp_ms = timestamp_ms,
.manifest_list = manifest_list_path,
.schema_id = schema_->schema_id()});
auto metadata = std::make_shared<TableMetadata>(TableMetadata{
.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = 1L,
.last_updated_ms = timestamp_ms,
.last_column_id = 2,
.schemas = {schema_},
.current_schema_id = schema_->schema_id(),
.partition_specs = {partitioned_spec_, unpartitioned_spec_},
.default_spec_id = partitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = kSnapshotId,
.snapshots = {snapshot},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
.snapshot_id = kSnapshotId}},
.default_sort_order_id = 0,
.refs = {{"main",
std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = kSnapshotId, .retention = SnapshotRef::Branch{}})}}});
// Test 1: Filter matches only data1.parquet (id=25 is in range [1, 50])
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Filter(Expressions::Equal("id", Literal::Int(25)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 1);
EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/data1.parquet");
}
// Test 2: Filter matches only data2.parquet (id=75 is in range [51, 100])
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Filter(Expressions::Equal("id", Literal::Int(75)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 1);
EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/data2.parquet");
}
// Test 3: Filter matches both files (id > 0 covers both ranges)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Filter(Expressions::GreaterThan("id", Literal::Int(0)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet",
"/path/to/data2.parquet"));
}
// Test 4: Filter matches no files (id=200 is outside both ranges)
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Filter(Expressions::Equal("id", Literal::Int(200)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
EXPECT_TRUE(tasks.empty());
}
}
TEST_P(TableScanTest, PlanFilesWithDeleteFiles) {
auto version = GetParam();
if (version < 2) {
GTEST_SKIP() << "Delete files only supported in V2+";
}
constexpr int64_t kSnapshotId = 1000L;
const auto part_value = PartitionValues({Literal::Int(0)});
// Create data manifest with files
std::vector<ManifestEntry> data_entries{
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
MakeDataFile("/path/to/data1.parquet", part_value,
partitioned_spec_->spec_id(), /*record_count=*/100)),
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
MakeDataFile("/path/to/data2.parquet", part_value,
partitioned_spec_->spec_id(), /*record_count=*/200))};
auto data_manifest =
WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_);
// Create delete manifest with position delete files
std::vector<ManifestEntry> delete_entries{
MakeEntry(
ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2,
MakePositionDeleteFile("/path/to/pos_delete.parquet", part_value,
partitioned_spec_->spec_id(), "/path/to/data1.parquet")),
MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2,
MakeEqualityDeleteFile("/path/to/eq_delete.parquet", part_value,
partitioned_spec_->spec_id(), {1}))};
auto delete_manifest = WriteDeleteManifest(
version, kSnapshotId, std::move(delete_entries), partitioned_spec_);
std::string manifest_list_path = WriteManifestList(
version, kSnapshotId, /*sequence_number=*/2, {data_manifest, delete_manifest});
// Create a snapshot that references this manifest list
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
auto snapshot_with_manifests =
std::make_shared<Snapshot>(Snapshot{.snapshot_id = kSnapshotId,
.parent_snapshot_id = std::nullopt,
.sequence_number = 2L,
.timestamp_ms = timestamp_ms,
.manifest_list = manifest_list_path,
.summary = {},
.schema_id = schema_->schema_id()});
auto metadata_with_manifests = std::make_shared<TableMetadata>(
TableMetadata{.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = 2L,
.last_updated_ms = timestamp_ms,
.last_column_id = 2,
.schemas = {schema_},
.current_schema_id = schema_->schema_id(),
.partition_specs = {partitioned_spec_, unpartitioned_spec_},
.default_spec_id = partitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = kSnapshotId,
.snapshots = {snapshot_with_manifests},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
.snapshot_id = kSnapshotId}},
.default_sort_order_id = 0,
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = kSnapshotId,
.retention = SnapshotRef::Branch{},
})}}});
ICEBERG_UNWRAP_OR_FAIL(auto builder,
DataTableScanBuilder::Make(metadata_with_manifests, file_io_));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
ASSERT_EQ(tasks.size(), 2);
EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet",
"/path/to/data2.parquet"));
// Verify that delete files are associated with the tasks
for (const auto& task : tasks) {
EXPECT_GT(task->delete_files().size(), 0);
}
}
TEST_P(TableScanTest, SchemaWithSelectedColumnsAndFilter) {
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
SchemaField::MakeRequired(/*field_id=*/2, "data", string()),
SchemaField::MakeRequired(/*field_id=*/3, "value", int64())});
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
auto metadata = std::make_shared<TableMetadata>(TableMetadata{
.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = 1L,
.last_updated_ms = timestamp_ms,
.last_column_id = 3,
.schemas = {schema},
.current_schema_id = schema->schema_id(),
.partition_specs = {unpartitioned_spec_},
.default_spec_id = unpartitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = 1000L,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 1000L,
.parent_snapshot_id = std::nullopt,
.sequence_number = 1L,
.timestamp_ms = timestamp_ms,
.manifest_list = "/tmp/metadata/snap-1000-1-manifest-list.avro",
.schema_id = schema->schema_id(),
})},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
.snapshot_id = 1000L}},
.default_sort_order_id = 0,
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L,
.retention = SnapshotRef::Branch{},
})}},
});
// Select "data" column, filter on "id" column
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Select({"data"}).Filter(Expressions::Equal("id", Literal::Int(42)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
ASSERT_EQ(projected_schema->fields().size(), 2);
ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
EXPECT_TRUE(id_field.has_value());
EXPECT_EQ(id_field->get().field_id(), 1);
ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
EXPECT_TRUE(data_field.has_value());
EXPECT_EQ(data_field->get().field_id(), 2);
}
// Select "id" and "value", filter on "data"
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Select({"id", "value"})
.Filter(Expressions::Equal("data", Literal::String("test")));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
ASSERT_EQ(projected_schema->fields().size(), 3);
ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
EXPECT_TRUE(id_field.has_value());
ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
EXPECT_TRUE(data_field.has_value());
ICEBERG_UNWRAP_OR_FAIL(auto value_field, projected_schema->FindFieldByName("value"));
EXPECT_TRUE(value_field.has_value());
}
// Select "id", filter on "id" - should only have "id" once
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Select({"id"}).Filter(Expressions::Equal("id", Literal::Int(42)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
ASSERT_EQ(projected_schema->fields().size(), 1);
ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
EXPECT_TRUE(id_field.has_value());
EXPECT_EQ(id_field->get().field_id(), 1);
}
// Select columns without filter
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_));
builder->Select({"data"});
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());
ASSERT_EQ(projected_schema->fields().size(), 1);
ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
EXPECT_TRUE(data_field.has_value());
EXPECT_EQ(data_field->get().field_id(), 2);
}
}
INSTANTIATE_TEST_SUITE_P(TableScanVersions, TableScanTest, testing::Values(1, 2, 3));
} // namespace iceberg