blob: 3e789cb8547b91f30dd20eef4d44035a5416082e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
from typing import Dict, Literal
import fastavro
import pytest
from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import (
DataFile,
DataFileContent,
FileFormat,
ManifestContent,
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
read_manifest_list,
write_manifest,
write_manifest_list,
)
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import Record
from pyiceberg.types import IntegerType, NestedField
def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, str]) -> None:
with open(avro_file, "rb") as f:
reader = fastavro.reader(f)
metadata = reader.metadata
for k, v in expected_metadata.items():
assert k in metadata
assert metadata[k] == v
def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
manifest = ManifestFile(
manifest_path=generated_manifest_entry_file,
manifest_length=0,
partition_spec_id=0,
added_snapshot_id=0,
sequence_number=0,
partitions=[],
)
manifest_entries = manifest.fetch_manifest_entry(PyArrowFileIO())
manifest_entry = manifest_entries[0]
assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == 0
assert isinstance(manifest_entry.data_file, DataFile)
data_file = manifest_entry.data_file
assert data_file.content is DataFileContent.DATA
assert (
data_file.file_path
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]"
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}
assert data_file.value_counts == {
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}
assert data_file.null_value_counts == {
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}
assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
assert data_file.lower_bounds == {
2: b"2020-04-01 00:00",
3: b"2020-04-01 00:12",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
}
assert data_file.upper_bounds == {
2: b"2020-04-30 23:5:",
3: b"2020-05-01 00:41",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.sort_order_id == 0
def test_read_manifest_list(generated_manifest_file_file_v1: str) -> None:
input_file = PyArrowFileIO().new_input(generated_manifest_file_file_v1)
manifest_list = list(read_manifest_list(input_file))[0]
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
assert manifest_list.deleted_files_count == 0
assert isinstance(manifest_list.partitions, list)
partitions_summary = manifest_list.partitions[0]
assert isinstance(partitions_summary, PartitionFieldSummary)
assert partitions_summary.contains_null is True
assert partitions_summary.contains_nan is False
assert partitions_summary.lower_bound == b"\x01\x00\x00\x00"
assert partitions_summary.upper_bound == b"\x02\x00\x00\x00"
assert manifest_list.added_rows_count == 237993
assert manifest_list.existing_rows_count == 0
assert manifest_list.deleted_rows_count == 0
def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1,
summary=Summary(Operation.APPEND),
schema_id=3,
)
manifest_list = snapshot.manifests(io)[0]
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
assert manifest_list.content == ManifestContent.DATA
assert manifest_list.sequence_number == 0
assert manifest_list.min_sequence_number == 0
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
assert manifest_list.deleted_files_count == 0
assert manifest_list.added_rows_count == 237993
assert manifest_list.existing_rows_count == 0
assert manifest_list.deleted_rows_count == 0
assert manifest_list.key_metadata is None
assert isinstance(manifest_list.partitions, list)
partition = manifest_list.partitions[0]
assert isinstance(partition, PartitionFieldSummary)
assert partition.contains_null is True
assert partition.contains_nan is False
assert partition.lower_bound == b"\x01\x00\x00\x00"
assert partition.upper_bound == b"\x02\x00\x00\x00"
entries = manifest_list.fetch_manifest_entry(io)
assert isinstance(entries, list)
entry = entries[0]
assert entry.data_sequence_number == 0
assert entry.file_sequence_number == 0
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
manifest_list = snapshot.manifests(io)[0]
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
assert manifest_list.content == ManifestContent.DELETES
assert manifest_list.sequence_number == 3
assert manifest_list.min_sequence_number == 3
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
assert manifest_list.deleted_files_count == 0
assert manifest_list.added_rows_count == 237993
assert manifest_list.existing_rows_count == 0
assert manifest_list.deleted_rows_count == 0
assert manifest_list.key_metadata is None
assert isinstance(manifest_list.partitions, list)
partition = manifest_list.partitions[0]
assert isinstance(partition, PartitionFieldSummary)
assert partition.contains_null is True
assert partition.contains_nan is False
assert partition.lower_bound == b"\x01\x00\x00\x00"
assert partition.upper_bound == b"\x02\x00\x00\x00"
entries = manifest_list.fetch_manifest_entry(io)
assert isinstance(entries, list)
entry = entries[0]
assert entry.data_sequence_number == 3
assert entry.file_sequence_number == 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: Literal[1, 2]
) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
demo_manifest_file = snapshot.manifests(io)[0]
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
test_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False)
)
test_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"),
PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"),
spec_id=demo_manifest_file.partition_spec_id,
)
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
output = io.new_output(tmp_avro_file)
with write_manifest(
format_version=format_version,
spec=test_spec,
schema=test_schema,
output_file=output,
snapshot_id=8744736658442914487,
) as writer:
for entry in manifest_entries:
writer.add_entry(entry)
new_manifest = writer.to_manifest_file()
with pytest.raises(RuntimeError):
# It is already closed
writer.add_entry(manifest_entries[0])
expected_metadata = {
"schema": test_schema.model_dump_json(),
"partition-spec": test_spec.model_dump_json(),
"partition-spec-id": str(test_spec.spec_id),
"format-version": str(format_version),
}
_verify_metadata_with_fastavro(
tmp_avro_file,
expected_metadata,
)
new_manifest_entries = new_manifest.fetch_manifest_entry(io)
manifest_entry = new_manifest_entries[0]
assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3
assert isinstance(manifest_entry.data_file, DataFile)
data_file = manifest_entry.data_file
assert data_file.content is DataFileContent.DATA
assert (
data_file.file_path
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925)
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}
assert data_file.value_counts == {
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}
assert data_file.null_value_counts == {
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}
assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
assert data_file.lower_bounds == {
2: b"2020-04-01 00:00",
3: b"2020-04-01 00:12",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
}
assert data_file.upper_bounds == {
2: b"2020-04-30 23:5:",
3: b"2020-05-01 00:41",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.sort_order_id == 0
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest_list(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: Literal[1, 2]
) -> None:
io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)
demo_manifest_list = snapshot.manifests(io)
with TemporaryDirectory() as tmp_dir:
path = tmp_dir + "/manifest-list.avro"
output = io.new_output(path)
with write_manifest_list(
format_version=format_version, output_file=output, snapshot_id=25, parent_snapshot_id=19, sequence_number=0
) as writer:
writer.add_manifests(demo_manifest_list)
new_manifest_list = list(read_manifest_list(io.new_input(path)))
expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "19", "format-version": str(format_version)}
if format_version == 2:
expected_metadata["sequence-number"] = "0"
_verify_metadata_with_fastavro(path, expected_metadata)
manifest_file = new_manifest_list[0]
assert manifest_file.manifest_length == 7989
assert manifest_file.partition_spec_id == 0
assert manifest_file.content == ManifestContent.DATA if format_version == 1 else ManifestContent.DELETES
assert manifest_file.sequence_number == 0 if format_version == 1 else 3
assert manifest_file.min_sequence_number == 0 if format_version == 1 else 3
assert manifest_file.added_snapshot_id == 9182715666859759686
assert manifest_file.added_files_count == 3
assert manifest_file.existing_files_count == 0
assert manifest_file.deleted_files_count == 0
assert manifest_file.added_rows_count == 237993
assert manifest_file.existing_rows_count == 0
assert manifest_file.deleted_rows_count == 0
assert manifest_file.key_metadata is None
assert isinstance(manifest_file.partitions, list)
partition = manifest_file.partitions[0]
assert isinstance(partition, PartitionFieldSummary)
assert partition.contains_null is True
assert partition.contains_nan is False
assert partition.lower_bound == b"\x01\x00\x00\x00"
assert partition.upper_bound == b"\x02\x00\x00\x00"
entries = manifest_file.fetch_manifest_entry(io)
assert isinstance(entries, list)
entry = entries[0]
assert entry.data_sequence_number == 0 if format_version == 1 else 3
assert entry.file_sequence_number == 0 if format_version == 1 else 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED