| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import time |
| from enum import Enum |
| from typing import ( |
| Any, |
| Dict, |
| List, |
| Mapping, |
| Optional, |
| ) |
| |
| from pydantic import Field, PrivateAttr, model_serializer |
| |
| from pyiceberg.io import FileIO |
| from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list |
| from pyiceberg.typedef import IcebergBaseModel |
| |
| ADDED_DATA_FILES = 'added-data-files' |
| ADDED_DELETE_FILES = 'added-delete-files' |
| ADDED_EQUALITY_DELETES = 'added-equality-deletes' |
| ADDED_FILE_SIZE = 'added-files-size' |
| ADDED_POSITION_DELETES = 'added-position-deletes' |
| ADDED_POSITION_DELETE_FILES = 'added-position-delete-files' |
| ADDED_RECORDS = 'added-records' |
| DELETED_DATA_FILES = 'deleted-data-files' |
| DELETED_RECORDS = 'deleted-records' |
| ADDED_EQUALITY_DELETE_FILES = 'added-equality-delete-files' |
| REMOVED_DELETE_FILES = 'removed-delete-files' |
| REMOVED_EQUALITY_DELETES = 'removed-equality-deletes' |
| REMOVED_EQUALITY_DELETE_FILES = 'removed-equality-delete-files' |
| REMOVED_FILE_SIZE = 'removed-files-size' |
| REMOVED_POSITION_DELETES = 'removed-position-deletes' |
| REMOVED_POSITION_DELETE_FILES = 'removed-position-delete-files' |
| TOTAL_EQUALITY_DELETES = 'total-equality-deletes' |
| TOTAL_POSITION_DELETES = 'total-position-deletes' |
| TOTAL_DATA_FILES = 'total-data-files' |
| TOTAL_DELETE_FILES = 'total-delete-files' |
| TOTAL_RECORDS = 'total-records' |
| TOTAL_FILE_SIZE = 'total-files-size' |
| |
| |
| OPERATION = "operation" |
| |
| |
| class Operation(Enum): |
| """Describes the operation. |
| |
| Possible operation values are: |
| - append: Only data files were added and no files were removed. |
| - replace: Data and delete files were added and removed without changing table data; i.e., compaction, changing the data file format, or relocating data files. |
| - overwrite: Data and delete files were added and removed in a logical overwrite operation. |
| - delete: Data files were removed and their contents logically deleted and/or delete files were added to delete rows. |
| """ |
| |
| APPEND = "append" |
| REPLACE = "replace" |
| OVERWRITE = "overwrite" |
| DELETE = "delete" |
| |
| def __repr__(self) -> str: |
| """Return the string representation of the Operation class.""" |
| return f"Operation.{self.name}" |
| |
| |
| class Summary(IcebergBaseModel, Mapping[str, str]): |
| """A class that stores the summary information for a Snapshot. |
| |
| The snapshot summary’s operation field is used by some operations, |
| like snapshot expiration, to skip processing certain snapshots. |
| """ |
| |
| operation: Operation = Field() |
| _additional_properties: Dict[str, str] = PrivateAttr() |
| |
| def __init__(self, operation: Operation, **data: Any) -> None: |
| super().__init__(operation=operation, **data) |
| self._additional_properties = data |
| |
| def __getitem__(self, __key: str) -> Optional[Any]: # type: ignore |
| """Return a key as it is a map.""" |
| if __key.lower() == 'operation': |
| return self.operation |
| else: |
| return self._additional_properties.get(__key) |
| |
| def __setitem__(self, key: str, value: Any) -> None: |
| """Set a key as it is a map.""" |
| if key.lower() == 'operation': |
| self.operation = value |
| else: |
| self._additional_properties[key] = value |
| |
| def __len__(self) -> int: |
| """Return the number of keys in the summary.""" |
| # Operation is required |
| return 1 + len(self._additional_properties) |
| |
| @model_serializer |
| def ser_model(self) -> Dict[str, str]: |
| return { |
| "operation": str(self.operation.value), |
| **self._additional_properties, |
| } |
| |
| @property |
| def additional_properties(self) -> Dict[str, str]: |
| return self._additional_properties |
| |
| def __repr__(self) -> str: |
| """Return the string representation of the Summary class.""" |
| repr_properties = f", **{repr(self._additional_properties)}" if self._additional_properties else "" |
| return f"Summary({repr(self.operation)}{repr_properties})" |
| |
| def __eq__(self, other: Any) -> bool: |
| """Compare if the summary is equal to another summary.""" |
| return ( |
| self.operation == other.operation and self.additional_properties == other.additional_properties |
| if isinstance(other, Summary) |
| else False |
| ) |
| |
| |
| class Snapshot(IcebergBaseModel): |
| snapshot_id: int = Field(alias="snapshot-id") |
| parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) |
| sequence_number: Optional[int] = Field(alias="sequence-number", default=None) |
| timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) |
| manifest_list: Optional[str] = Field( |
| alias="manifest-list", description="Location of the snapshot's manifest list file", default=None |
| ) |
| summary: Optional[Summary] = Field(default=None) |
| schema_id: Optional[int] = Field(alias="schema-id", default=None) |
| |
| def __str__(self) -> str: |
| """Return the string representation of the Snapshot class.""" |
| operation = f"{self.summary.operation}: " if self.summary else "" |
| parent_id = f", parent_id={self.parent_snapshot_id}" if self.parent_snapshot_id else "" |
| schema_id = f", schema_id={self.schema_id}" if self.schema_id is not None else "" |
| result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}" |
| return result_str |
| |
| def manifests(self, io: FileIO) -> List[ManifestFile]: |
| if self.manifest_list is not None: |
| file = io.new_input(self.manifest_list) |
| return list(read_manifest_list(file)) |
| return [] |
| |
| |
| class MetadataLogEntry(IcebergBaseModel): |
| metadata_file: str = Field(alias="metadata-file") |
| timestamp_ms: int = Field(alias="timestamp-ms") |
| |
| |
| class SnapshotLogEntry(IcebergBaseModel): |
| snapshot_id: int = Field(alias="snapshot-id") |
| timestamp_ms: int = Field(alias="timestamp-ms") |
| |
| |
| class SnapshotSummaryCollector: |
| added_file_size: int |
| removed_file_size: int |
| added_data_files: int |
| removed_data_files: int |
| added_eq_delete_files: int |
| removed_eq_delete_files: int |
| added_pos_delete_files: int |
| removed_pos_delete_files: int |
| added_delete_files: int |
| removed_delete_files: int |
| added_records: int |
| deleted_records: int |
| added_pos_deletes: int |
| removed_pos_deletes: int |
| added_eq_deletes: int |
| removed_eq_deletes: int |
| |
| def __init__(self) -> None: |
| self.added_file_size = 0 |
| self.removed_file_size = 0 |
| self.added_data_files = 0 |
| self.removed_data_files = 0 |
| self.added_eq_delete_files = 0 |
| self.removed_eq_delete_files = 0 |
| self.added_pos_delete_files = 0 |
| self.removed_pos_delete_files = 0 |
| self.added_delete_files = 0 |
| self.removed_delete_files = 0 |
| self.added_records = 0 |
| self.deleted_records = 0 |
| self.added_pos_deletes = 0 |
| self.removed_pos_deletes = 0 |
| self.added_eq_deletes = 0 |
| self.removed_eq_deletes = 0 |
| |
| def add_file(self, data_file: DataFile) -> None: |
| self.added_file_size += data_file.file_size_in_bytes |
| |
| if data_file.content == DataFileContent.DATA: |
| self.added_data_files += 1 |
| self.added_records += data_file.record_count |
| elif data_file.content == DataFileContent.POSITION_DELETES: |
| self.added_delete_files += 1 |
| self.added_pos_delete_files += 1 |
| self.added_pos_deletes += data_file.record_count |
| elif data_file.content == DataFileContent.EQUALITY_DELETES: |
| self.added_delete_files += 1 |
| self.added_eq_delete_files += 1 |
| self.added_eq_deletes += data_file.record_count |
| else: |
| raise ValueError(f"Unknown data file content: {data_file.content}") |
| |
| def remove_file(self, data_file: DataFile) -> None: |
| self.removed_file_size += data_file.file_size_in_bytes |
| |
| if data_file.content == DataFileContent.DATA: |
| self.removed_data_files += 1 |
| self.deleted_records += data_file.record_count |
| elif data_file.content == DataFileContent.POSITION_DELETES: |
| self.removed_delete_files += 1 |
| self.removed_pos_delete_files += 1 |
| self.removed_pos_deletes += data_file.record_count |
| elif data_file.content == DataFileContent.EQUALITY_DELETES: |
| self.removed_delete_files += 1 |
| self.removed_eq_delete_files += 1 |
| self.removed_eq_deletes += data_file.record_count |
| else: |
| raise ValueError(f"Unknown data file content: {data_file.content}") |
| |
| def build(self) -> Dict[str, str]: |
| def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: |
| if num > 0: |
| properties[property_name] = str(num) |
| |
| properties: Dict[str, str] = {} |
| set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) |
| set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) |
| set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) |
| set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) |
| set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) |
| set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) |
| set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) |
| set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) |
| set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) |
| set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) |
| set_when_positive(properties, self.added_records, ADDED_RECORDS) |
| set_when_positive(properties, self.deleted_records, DELETED_RECORDS) |
| set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) |
| set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) |
| set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) |
| set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) |
| |
| return properties |
| |
| |
| def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: |
| for prop in { |
| TOTAL_DATA_FILES, |
| TOTAL_DELETE_FILES, |
| TOTAL_RECORDS, |
| TOTAL_FILE_SIZE, |
| TOTAL_POSITION_DELETES, |
| TOTAL_EQUALITY_DELETES, |
| }: |
| summary[prop] = '0' |
| |
| def get_prop(prop: str) -> int: |
| value = previous_summary.get(prop) or '0' |
| try: |
| return int(value) |
| except ValueError as e: |
| raise ValueError(f"Could not parse summary property {prop} to an int: {value}") from e |
| |
| if value := get_prop(TOTAL_DATA_FILES): |
| summary[DELETED_DATA_FILES] = str(value) |
| if value := get_prop(TOTAL_DELETE_FILES): |
| summary[REMOVED_DELETE_FILES] = str(value) |
| if value := get_prop(TOTAL_RECORDS): |
| summary[DELETED_RECORDS] = str(value) |
| if value := get_prop(TOTAL_FILE_SIZE): |
| summary[REMOVED_FILE_SIZE] = str(value) |
| if value := get_prop(TOTAL_POSITION_DELETES): |
| summary[REMOVED_POSITION_DELETES] = str(value) |
| if value := get_prop(TOTAL_EQUALITY_DELETES): |
| summary[REMOVED_EQUALITY_DELETES] = str(value) |
| |
| return summary |
| |
| |
| def update_snapshot_summaries( |
| summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False |
| ) -> Summary: |
| if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}: |
| raise ValueError(f"Operation not implemented: {summary.operation}") |
| |
| if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: |
| summary = _truncate_table_summary(summary, previous_summary) |
| |
| if not previous_summary: |
| previous_summary = { |
| TOTAL_DATA_FILES: '0', |
| TOTAL_DELETE_FILES: '0', |
| TOTAL_RECORDS: '0', |
| TOTAL_FILE_SIZE: '0', |
| TOTAL_POSITION_DELETES: '0', |
| TOTAL_EQUALITY_DELETES: '0', |
| } |
| |
| def _update_totals(total_property: str, added_property: str, removed_property: str) -> None: |
| if previous_total_str := previous_summary.get(total_property): |
| try: |
| new_total = int(previous_total_str) |
| if new_total >= 0 and (added := summary.get(added_property)): |
| new_total += int(added) |
| if new_total >= 0 and (removed := summary.get(removed_property)): |
| new_total -= int(removed) |
| except ValueError as e: |
| raise ValueError(f"Could not parse summary property {total_property} to an int: {previous_total_str}") from e |
| |
| if new_total >= 0: |
| summary[total_property] = str(new_total) |
| |
| _update_totals( |
| total_property=TOTAL_DATA_FILES, |
| added_property=ADDED_DATA_FILES, |
| removed_property=DELETED_DATA_FILES, |
| ) |
| _update_totals( |
| total_property=TOTAL_DELETE_FILES, |
| added_property=ADDED_DELETE_FILES, |
| removed_property=REMOVED_DELETE_FILES, |
| ) |
| _update_totals( |
| total_property=TOTAL_RECORDS, |
| added_property=ADDED_RECORDS, |
| removed_property=DELETED_RECORDS, |
| ) |
| _update_totals( |
| total_property=TOTAL_FILE_SIZE, |
| added_property=ADDED_FILE_SIZE, |
| removed_property=REMOVED_FILE_SIZE, |
| ) |
| _update_totals( |
| total_property=TOTAL_POSITION_DELETES, |
| added_property=ADDED_POSITION_DELETES, |
| removed_property=REMOVED_POSITION_DELETES, |
| ) |
| _update_totals( |
| total_property=TOTAL_EQUALITY_DELETES, |
| added_property=ADDED_EQUALITY_DELETES, |
| removed_property=REMOVED_EQUALITY_DELETES, |
| ) |
| |
| return summary |