blob: 3f859b3b3201aa0e5a8c63840234c8a69deb8f39 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
import fastavro
import pytest
from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import (
DataFile,
DataFileContent,
FileFormat,
ManifestContent,
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
_manifest_cache,
_manifests,
read_manifest_list,
write_manifest,
write_manifest_list,
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.typedef import Record, TableVersion
from pyiceberg.types import IntegerType, NestedField
@pytest.fixture(autouse=True)
def clear_global_manifests_cache() -> None:
# Clear the global cache before each test
_manifest_cache.clear()
def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None:
with open(avro_file, "rb") as f:
reader = fastavro.reader(f)
metadata = reader.metadata
for k, v in expected_metadata.items():
assert k in metadata
assert metadata[k] == v
def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
manifest = ManifestFile.from_args(
manifest_path=generated_manifest_entry_file,
manifest_length=0,
partition_spec_id=0,
added_snapshot_id=0,
sequence_number=0,
partitions=[],
)
manifest_entries = manifest.fetch_manifest_entry(PyArrowFileIO())
manifest_entry = manifest_entries[0]
assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.sequence_number == 0
assert isinstance(manifest_entry.data_file, DataFile)
data_file = manifest_entry.data_file
assert data_file.content == DataFileContent.DATA
assert data_file.file_path == (
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/"
"00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert repr(data_file.partition) == "Record[1, 1925]"
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}
assert data_file.value_counts == {
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}
assert data_file.null_value_counts == {
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}
assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
assert data_file.lower_bounds == {
2: b"\x01\x00\x00\x00\x00\x00\x00\x00",
3: b"\x01\x00\x00\x00\x00\x00\x00\x00",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
}
assert data_file.upper_bounds == {
2: b"\x06\x00\x00\x00\x00\x00\x00\x00",
3: b"\x06\x00\x00\x00\x00\x00\x00\x00",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.sort_order_id == 0
def test_read_manifest_list(generated_manifest_file_file_v1: str) -> None:
input_file = PyArrowFileIO().new_input(generated_manifest_file_file_v1)
manifest_list = list(read_manifest_list(input_file))[0]
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
assert manifest_list.deleted_files_count == 0
assert isinstance(manifest_list.partitions, list)
partitions_summary = manifest_list.partitions[0]
assert isinstance(partitions_summary, PartitionFieldSummary)
assert partitions_summary.contains_null is True
assert partitions_summary.contains_nan is False
assert partitions_summary.lower_bound == b"\x01\x00\x00\x00"
assert partitions_summary.upper_bound == b"\x02\x00\x00\x00"
assert manifest_list.added_rows_count == 237993
assert manifest_list.existing_rows_count == 0
assert manifest_list.deleted_rows_count == 0
def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1,
summary=Summary(Operation.APPEND),
schema_id=3,
)
manifest_list = snapshot.manifests(io)[0]
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
assert manifest_list.content == ManifestContent.DATA
assert manifest_list.sequence_number == 0
assert manifest_list.min_sequence_number == 0
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
assert manifest_list.deleted_files_count == 0
assert manifest_list.added_rows_count == 237993
assert manifest_list.existing_rows_count == 0
assert manifest_list.deleted_rows_count == 0
assert manifest_list.key_metadata is None
assert isinstance(manifest_list.partitions, list)
partition = manifest_list.partitions[0]
assert isinstance(partition, PartitionFieldSummary)
assert partition.contains_null is True
assert partition.contains_nan is False
assert partition.lower_bound == b"\x01\x00\x00\x00"
assert partition.upper_bound == b"\x02\x00\x00\x00"
entries = manifest_list.fetch_manifest_entry(io)
assert isinstance(entries, list)
entry = entries[0]
assert entry.sequence_number == 0
assert entry.file_sequence_number == 0
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
manifest_list = snapshot.manifests(io)[0]
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
assert manifest_list.content == ManifestContent.DELETES
assert manifest_list.sequence_number == 3
assert manifest_list.min_sequence_number == 3
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
assert manifest_list.deleted_files_count == 0
assert manifest_list.added_rows_count == 237993
assert manifest_list.existing_rows_count == 0
assert manifest_list.deleted_rows_count == 0
assert manifest_list.key_metadata is None
assert isinstance(manifest_list.partitions, list)
partition = manifest_list.partitions[0]
assert isinstance(partition, PartitionFieldSummary)
assert partition.contains_null is True
assert partition.contains_nan is False
assert partition.lower_bound == b"\x01\x00\x00\x00"
assert partition.upper_bound == b"\x02\x00\x00\x00"
entries = manifest_list.fetch_manifest_entry(io)
assert isinstance(entries, list)
entry = entries[0]
assert entry.sequence_number == 3
assert entry.file_sequence_number == 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None:
"""Test that ManifestFile objects are cached and reused across multiple reads.
The cache now stores individual ManifestFile objects by their manifest_path,
rather than caching entire manifest list tuples. This is more memory-efficient
when multiple manifest lists share overlapping ManifestFile objects.
"""
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
# Access the manifests property multiple times
manifests_first_call = snapshot.manifests(io)
manifests_second_call = snapshot.manifests(io)
# Ensure that the same manifest list content is returned
assert manifests_first_call == manifests_second_call
# Verify that ManifestFile objects are the same instances (cached)
for mf1, mf2 in zip(manifests_first_call, manifests_second_call, strict=True):
assert mf1 is mf2, "ManifestFile objects should be the same cached instance"
def test_write_empty_manifest() -> None:
io = load_file_io()
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
with pytest.raises(ValueError, match="An empty manifest file has been written"):
with write_manifest(
format_version=1,
spec=UNPARTITIONED_PARTITION_SPEC,
schema=test_schema,
output_file=io.new_output(tmp_avro_file),
snapshot_id=8744736658442914487,
avro_compression="deflate",
) as _:
pass
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize("compression", ["null", "deflate", "zstd"])
def test_write_manifest(
generated_manifest_file_file_v1: str,
generated_manifest_file_file_v2: str,
format_version: TableVersion,
test_schema: Schema,
test_partition_spec: PartitionSpec,
compression: AvroCompressionCodec,
) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
demo_manifest_file = snapshot.manifests(io)[0]
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
output = io.new_output(tmp_avro_file)
with write_manifest(
format_version=format_version,
spec=test_partition_spec,
schema=test_schema,
output_file=output,
snapshot_id=8744736658442914487,
avro_compression=compression,
) as writer:
for entry in manifest_entries:
writer.add_entry(entry)
new_manifest = writer.to_manifest_file()
with pytest.raises(RuntimeError):
# It is already closed
writer.add_entry(manifest_entries[0])
expected_metadata = {
"schema": test_schema.model_dump_json(),
"partition-spec": (
'[{"source-id":1,"field-id":1000,"transform":"identity","name":"VendorID"},'
'{"source-id":2,"field-id":1001,"transform":"day","name":"tpep_pickup_day"}]'
),
"partition-spec-id": str(demo_manifest_file.partition_spec_id),
"format-version": str(format_version),
}
_verify_metadata_with_fastavro(
tmp_avro_file,
expected_metadata,
)
new_manifest_entries = new_manifest.fetch_manifest_entry(io)
manifest_entry = new_manifest_entries[0]
assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.sequence_number == -1 if format_version == 1 else 3
assert isinstance(manifest_entry.data_file, DataFile)
data_file = manifest_entry.data_file
assert data_file.content == DataFileContent.DATA
assert data_file.file_path == (
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/"
"00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert data_file.partition == Record(1, 1925)
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}
assert data_file.value_counts == {
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}
assert data_file.null_value_counts == {
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}
assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
assert data_file.lower_bounds == {
2: b"\x01\x00\x00\x00\x00\x00\x00\x00",
3: b"\x01\x00\x00\x00\x00\x00\x00\x00",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
}
assert data_file.upper_bounds == {
2: b"\x06\x00\x00\x00\x00\x00\x00\x00",
3: b"\x06\x00\x00\x00\x00\x00\x00\x00",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.sort_order_id == 0
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize("parent_snapshot_id", [19, None])
@pytest.mark.parametrize("compression", ["null", "deflate"])
def test_write_manifest_list(
generated_manifest_file_file_v1: str,
generated_manifest_file_file_v2: str,
format_version: TableVersion,
parent_snapshot_id: int | None,
compression: AvroCompressionCodec,
) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=parent_snapshot_id,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
demo_manifest_list = snapshot.manifests(io)
with TemporaryDirectory() as tmp_dir:
path = tmp_dir + "/manifest-list.avro"
output = io.new_output(path)
with write_manifest_list(
format_version=format_version,
output_file=output,
snapshot_id=25,
parent_snapshot_id=parent_snapshot_id,
sequence_number=0,
avro_compression=compression,
) as writer:
writer.add_manifests(demo_manifest_list)
new_manifest_list = list(read_manifest_list(io.new_input(path)))
if parent_snapshot_id:
expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "19", "format-version": str(format_version)}
else:
expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "null", "format-version": str(format_version)}
if format_version == 2:
expected_metadata["sequence-number"] = "0"
_verify_metadata_with_fastavro(path, expected_metadata)
manifest_file = new_manifest_list[0]
assert manifest_file.manifest_length == 7989
assert manifest_file.partition_spec_id == 0
assert manifest_file.content == ManifestContent.DATA if format_version == 1 else ManifestContent.DELETES
assert manifest_file.sequence_number == 0 if format_version == 1 else 3
assert manifest_file.min_sequence_number == 0 if format_version == 1 else 3
assert manifest_file.added_snapshot_id == 9182715666859759686
assert manifest_file.added_files_count == 3
assert manifest_file.existing_files_count == 0
assert manifest_file.deleted_files_count == 0
assert manifest_file.added_rows_count == 237993
assert manifest_file.existing_rows_count == 0
assert manifest_file.deleted_rows_count == 0
assert manifest_file.key_metadata is None
assert isinstance(manifest_file.partitions, list)
partition = manifest_file.partitions[0]
assert isinstance(partition, PartitionFieldSummary)
assert partition.contains_null is True
assert partition.contains_nan is False
assert partition.lower_bound == b"\x01\x00\x00\x00"
assert partition.upper_bound == b"\x02\x00\x00\x00"
entries = manifest_file.fetch_manifest_entry(io)
assert isinstance(entries, list)
entry = entries[0]
assert entry.sequence_number == 0 if format_version == 1 else 3
assert entry.file_sequence_number == 0 if format_version == 1 else 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
@pytest.mark.parametrize(
"raw_file_format,expected_file_format",
[
("avro", FileFormat("AVRO")),
("AVRO", FileFormat("AVRO")),
("parquet", FileFormat("PARQUET")),
("PARQUET", FileFormat("PARQUET")),
("orc", FileFormat("ORC")),
("ORC", FileFormat("ORC")),
("NOT_EXISTS", None),
],
)
def test_file_format_case_insensitive(raw_file_format: str, expected_file_format: FileFormat) -> None:
if expected_file_format:
parsed_file_format = FileFormat(raw_file_format)
assert parsed_file_format == expected_file_format, (
f"File format {raw_file_format}: {parsed_file_format} != {expected_file_format}"
)
else:
with pytest.raises(ValueError):
_ = FileFormat(raw_file_format)
def test_manifest_cache_deduplicates_manifest_files() -> None:
"""Test that the manifest cache deduplicates ManifestFile objects across manifest lists.
This test verifies the fix for https://github.com/apache/iceberg-python/issues/2325
The issue was that when caching manifest lists by their path, overlapping ManifestFile
objects were duplicated. For example:
- ManifestList1: (ManifestFile1)
- ManifestList2: (ManifestFile1, ManifestFile2)
- ManifestList3: (ManifestFile1, ManifestFile2, ManifestFile3)
With the old approach, ManifestFile1 was stored 3 times in the cache.
With the new approach, ManifestFile objects are cached individually by their
manifest_path, so ManifestFile1 is stored only once and reused.
"""
io = PyArrowFileIO()
with TemporaryDirectory() as tmp_dir:
# Create three manifest files to simulate manifests created during appends
manifest1_path = f"{tmp_dir}/manifest1.avro"
manifest2_path = f"{tmp_dir}/manifest2.avro"
manifest3_path = f"{tmp_dir}/manifest3.avro"
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
spec = UNPARTITIONED_PARTITION_SPEC
# Create manifest file 1
with write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest1_path),
snapshot_id=1,
avro_compression="zstandard",
) as writer:
data_file1 = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmp_dir}/data1.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=100,
file_size_in_bytes=1000,
)
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=1,
data_file=data_file1,
)
)
manifest_file1 = writer.to_manifest_file()
# Create manifest file 2
with write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest2_path),
snapshot_id=2,
avro_compression="zstandard",
) as writer:
data_file2 = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmp_dir}/data2.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=200,
file_size_in_bytes=2000,
)
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=2,
data_file=data_file2,
)
)
manifest_file2 = writer.to_manifest_file()
# Create manifest file 3
with write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest3_path),
snapshot_id=3,
avro_compression="zstandard",
) as writer:
data_file3 = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmp_dir}/data3.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=300,
file_size_in_bytes=3000,
)
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=3,
data_file=data_file3,
)
)
manifest_file3 = writer.to_manifest_file()
# Create manifest list 1: contains only manifest1
manifest_list1_path = f"{tmp_dir}/manifest-list1.avro"
with write_manifest_list(
format_version=2,
output_file=io.new_output(manifest_list1_path),
snapshot_id=1,
parent_snapshot_id=None,
sequence_number=1,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests([manifest_file1])
# Create manifest list 2: contains manifest1 and manifest2 (overlapping manifest1)
manifest_list2_path = f"{tmp_dir}/manifest-list2.avro"
with write_manifest_list(
format_version=2,
output_file=io.new_output(manifest_list2_path),
snapshot_id=2,
parent_snapshot_id=1,
sequence_number=2,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests([manifest_file1, manifest_file2])
# Create manifest list 3: contains all three manifests (overlapping manifest1 and manifest2)
manifest_list3_path = f"{tmp_dir}/manifest-list3.avro"
with write_manifest_list(
format_version=2,
output_file=io.new_output(manifest_list3_path),
snapshot_id=3,
parent_snapshot_id=2,
sequence_number=3,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests([manifest_file1, manifest_file2, manifest_file3])
# Read all three manifest lists
manifests1 = _manifests(io, manifest_list1_path)
manifests2 = _manifests(io, manifest_list2_path)
manifests3 = _manifests(io, manifest_list3_path)
# Verify the manifest files have the expected paths
assert len(manifests1) == 1
assert len(manifests2) == 2
assert len(manifests3) == 3
# Verify that ManifestFile objects with the same manifest_path are the same object (identity)
# This is the key assertion - if caching works correctly, the same ManifestFile
# object should be reused instead of creating duplicates
# manifest_file1 appears in all three lists - should be the same object
assert manifests1[0] is manifests2[0], "ManifestFile1 should be the same object instance across manifest lists"
assert manifests2[0] is manifests3[0], "ManifestFile1 should be the same object instance across manifest lists"
# manifest_file2 appears in lists 2 and 3 - should be the same object
assert manifests2[1] is manifests3[1], "ManifestFile2 should be the same object instance across manifest lists"
# Verify cache size - should only have 3 unique ManifestFile objects
# instead of 1 + 2 + 3 = 6 objects as with the old approach
assert len(_manifest_cache) == 3, (
f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}"
)
def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
"""Test that the manifest cache remains efficient with many overlapping manifest lists.
This simulates the scenario from GitHub issue #2325 where many appends create
manifest lists that increasingly overlap.
"""
io = PyArrowFileIO()
with TemporaryDirectory() as tmp_dir:
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
spec = UNPARTITIONED_PARTITION_SPEC
num_manifests = 10
manifest_files = []
# Create N manifest files
for i in range(num_manifests):
manifest_path = f"{tmp_dir}/manifest{i}.avro"
with write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest_path),
snapshot_id=i + 1,
avro_compression="zstandard",
) as writer:
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmp_dir}/data{i}.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=100 * (i + 1),
file_size_in_bytes=1000 * (i + 1),
)
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=i + 1,
data_file=data_file,
)
)
manifest_files.append(writer.to_manifest_file())
# Create N manifest lists, each containing an increasing number of manifests
# list[i] contains manifests[0:i+1]
manifest_list_paths = []
for i in range(num_manifests):
list_path = f"{tmp_dir}/manifest-list{i}.avro"
with write_manifest_list(
format_version=2,
output_file=io.new_output(list_path),
snapshot_id=i + 1,
parent_snapshot_id=i if i > 0 else None,
sequence_number=i + 1,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests(manifest_files[: i + 1])
manifest_list_paths.append(list_path)
# Read all manifest lists
all_results = []
for path in manifest_list_paths:
result = _manifests(io, path)
all_results.append(result)
# With the old cache approach, we would have:
# 1 + 2 + 3 + ... + N = N*(N+1)/2 ManifestFile objects in memory
# With the new approach, we should have exactly N objects
# Verify cache has exactly N unique entries
assert len(_manifest_cache) == num_manifests, (
f"Cache should contain exactly {num_manifests} ManifestFile objects, "
f"but has {len(_manifest_cache)}. "
f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects."
)
# Verify object identity - all references to the same manifest should be the same object
for i in range(num_manifests):
# Find all references to this manifest across all results
references = []
for j, result in enumerate(all_results):
if j >= i: # This manifest should be in lists from i onwards
references.append(result[i])
# All references should be the same object
if len(references) > 1:
for ref in references[1:]:
assert ref is references[0], f"All references to manifest {i} should be the same object instance"
@pytest.mark.parametrize("format_version", [1, 2])
def test_manifest_writer_tell(format_version: TableVersion) -> None:
io = load_file_io()
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
with TemporaryDirectory() as tmpdir:
output_file = io.new_output(f"{tmpdir}/test-manifest.avro")
with write_manifest(
format_version=format_version,
spec=UNPARTITIONED_PARTITION_SPEC,
schema=test_schema,
output_file=output_file,
snapshot_id=1,
avro_compression="null",
) as writer:
initial_bytes = writer.tell()
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmpdir}/data.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=100,
file_size_in_bytes=1000,
)
entry = ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=1,
data_file=data_file,
)
writer.add_entry(entry)
after_entry_bytes = writer.tell()
assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry"