blob: 09dd9ac81b0574542c025774c5d305cdcae78549 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntry, ManifestEntryStatus
from pyiceberg.table.delete_file_index import PATH_FIELD_ID, DeleteFileIndex, PositionDeletes
from pyiceberg.typedef import Record
def _create_data_file(file_path: str = "s3://bucket/data.parquet", spec_id: int = 0) -> DataFile:
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=100,
file_size_in_bytes=1000,
)
data_file._spec_id = spec_id
return data_file
def _create_positional_delete(
sequence_number: int = 1, file_path: str = "s3://bucket/data.parquet", spec_id: int = 0
) -> ManifestEntry:
delete_file = DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path=f"s3://bucket/pos-delete-{sequence_number}.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=10,
file_size_in_bytes=100,
lower_bounds={PATH_FIELD_ID: file_path.encode()},
upper_bounds={PATH_FIELD_ID: file_path.encode()},
)
delete_file._spec_id = spec_id
return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file)
def _create_partition_delete(sequence_number: int = 1, spec_id: int = 0, partition: Record | None = None) -> ManifestEntry:
delete_file = DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path=f"s3://bucket/pos-delete-{sequence_number}.parquet",
file_format=FileFormat.PARQUET,
partition=partition or Record(),
record_count=10,
file_size_in_bytes=100,
)
delete_file._spec_id = spec_id
return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file)
def _create_deletion_vector(
sequence_number: int = 1, file_path: str = "s3://bucket/data.parquet", spec_id: int = 0
) -> ManifestEntry:
delete_file = DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path=f"s3://bucket/deletion-vector-{sequence_number}.puffin",
file_format=FileFormat.PUFFIN,
partition=Record(),
record_count=10,
file_size_in_bytes=100,
lower_bounds={PATH_FIELD_ID: file_path.encode()},
upper_bounds={PATH_FIELD_ID: file_path.encode()},
)
delete_file._spec_id = spec_id
return ManifestEntry.from_args(status=ManifestEntryStatus.ADDED, sequence_number=sequence_number, data_file=delete_file)
def test_empty_index() -> None:
index = DeleteFileIndex()
data_file = _create_data_file()
assert index.for_data_file(1, data_file) == set()
def test_sequence_number_filtering() -> None:
index = DeleteFileIndex()
index.add_delete_file(_create_positional_delete(sequence_number=2))
index.add_delete_file(_create_positional_delete(sequence_number=4))
index.add_delete_file(_create_positional_delete(sequence_number=6))
data_file = _create_data_file()
assert len(index.for_data_file(1, data_file)) == 3
assert len(index.for_data_file(2, data_file)) == 3
assert len(index.for_data_file(3, data_file)) == 2
assert len(index.for_data_file(5, data_file)) == 1
assert len(index.for_data_file(7, data_file)) == 0
def test_path_specific_deletes() -> None:
index = DeleteFileIndex()
index.add_delete_file(_create_positional_delete(sequence_number=2, file_path="s3://bucket/a.parquet"))
index.add_delete_file(_create_positional_delete(sequence_number=2, file_path="s3://bucket/b.parquet"))
file_a = _create_data_file(file_path="s3://bucket/a.parquet")
file_b = _create_data_file(file_path="s3://bucket/b.parquet")
file_c = _create_data_file(file_path="s3://bucket/c.parquet")
assert len(index.for_data_file(1, file_a)) == 1
assert len(index.for_data_file(1, file_b)) == 1
assert len(index.for_data_file(1, file_c)) == 0
def test_partitioned_deletes() -> None:
index = DeleteFileIndex()
partition_1 = Record(1)
partition_2 = Record(2)
index.add_delete_file(_create_partition_delete(sequence_number=2, spec_id=0, partition=partition_1), partition_1)
index.add_delete_file(_create_partition_delete(sequence_number=2, spec_id=0, partition=partition_2), partition_2)
data_file = _create_data_file()
assert len(index.for_data_file(1, data_file, partition_1)) == 1
assert len(index.for_data_file(1, data_file, partition_2)) == 1
assert len(index.for_data_file(1, data_file, Record(3))) == 0
def test_mix_path_and_partition_deletes() -> None:
index = DeleteFileIndex()
partition = Record(1)
index.add_delete_file(_create_positional_delete(sequence_number=2, file_path="s3://bucket/a.parquet"))
index.add_delete_file(_create_partition_delete(sequence_number=3, spec_id=0, partition=partition), partition)
data_file = _create_data_file(file_path="s3://bucket/a.parquet")
result = index.for_data_file(1, data_file, partition)
assert len(result) == 2
def test_dvs_treated_as_position_deletes() -> None:
index = DeleteFileIndex()
index.add_delete_file(_create_positional_delete(sequence_number=2, file_path="s3://bucket/a.parquet"))
index.add_delete_file(_create_deletion_vector(sequence_number=3, file_path="s3://bucket/a.parquet"))
data_file = _create_data_file(file_path="s3://bucket/a.parquet")
result = index.for_data_file(1, data_file)
assert len(result) == 2
assert all(d.content == DataFileContent.POSITION_DELETES for d in result)
def test_cannot_add_after_indexing() -> None:
group = PositionDeletes()
group.add(_create_positional_delete(sequence_number=1).data_file, 1)
group.filter_by_seq(0)
with pytest.raises(ValueError, match="Cannot add files after indexing"):
group.add(_create_positional_delete(sequence_number=2).data_file, 2)
def test_record_equality_for_partition_lookup() -> None:
index = DeleteFileIndex()
partition_a = Record(1, "foo")
partition_b = Record(1, "foo")
partition_c = Record(1, "bar")
assert partition_a == partition_b
assert partition_a != partition_c
index.add_delete_file(_create_partition_delete(sequence_number=2, spec_id=0, partition=partition_a), partition_a)
data_file = _create_data_file()
assert len(index.for_data_file(1, data_file, partition_b)) == 1
assert len(index.for_data_file(1, data_file, partition_c)) == 0