blob: b49c4abe07598e317d534caf4c3622f17d12a92f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional, Set
from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record
VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE}
def _validation_history(
table: Table,
from_snapshot: Snapshot,
to_snapshot: Snapshot,
matching_operations: set[Operation],
manifest_content_filter: ManifestContent,
) -> tuple[list[ManifestFile], set[int]]:
"""Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot.
Args:
table: Table to get the history from
from_snapshot: Parent snapshot to get the history from
to_snapshot: Starting snapshot
matching_operations: Operations to match on
manifest_content_filter: Manifest content type to filter
Raises:
ValidationException: If no matching snapshot is found or only one snapshot is found
Returns:
List of manifest files and set of snapshots ID's matching conditions
"""
manifests_files: list[ManifestFile] = []
snapshots: set[int] = set()
last_snapshot = None
for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata):
last_snapshot = snapshot
summary = snapshot.summary
if summary is None:
raise ValidationException(f"No summary found for snapshot {snapshot}!")
if summary.operation not in matching_operations:
continue
snapshots.add(snapshot.snapshot_id)
# TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets
manifests_files.extend(
[
manifest
for manifest in snapshot.manifests(table.io)
if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter
]
)
if last_snapshot is not None and last_snapshot.snapshot_id != from_snapshot.snapshot_id:
raise ValidationException("No matching snapshot found.")
return manifests_files, snapshots
def _filter_manifest_entries(
entry: ManifestEntry,
snapshot_ids: set[int],
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
entry_status: Optional[ManifestEntryStatus],
schema: Schema,
) -> bool:
"""Filter manifest entries based on data filter and partition set.
Args:
entry: Manifest entry to filter
snapshot_ids: set of snapshot ids to match data files
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
entry_status: Optional status to match data files
schema: schema for filtering
Returns:
True if the entry should be included, False otherwise
"""
if entry.snapshot_id not in snapshot_ids:
return False
if entry_status is not None and entry.status != entry_status:
return False
if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(schema, data_filter)
if evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH:
return False
if partition_set is not None:
partition = entry.data_file.partition
spec_id = entry.data_file.spec_id
if spec_id not in partition_set or partition not in partition_set[spec_id]:
return False
return True
def _deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
parent_snapshot: Optional[Snapshot],
) -> Iterator[ManifestEntry]:
"""Find deleted data files matching a filter since a starting snapshot.
Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
partition_set: dict of {spec_id: set[partition]} to filter on
parent_snapshot: Ending snapshot on the branch being validated
Returns:
List of conflicting manifest-entries
"""
# if there is no current table state, no files have been deleted
if parent_snapshot is None:
return
manifests, snapshot_ids = _validation_history(
table,
parent_snapshot,
starting_snapshot,
VALIDATE_DATA_FILES_EXIST_OPERATIONS,
ManifestContent.DATA,
)
for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if _filter_manifest_entries(
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema()
):
yield entry
def _validate_deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Snapshot,
) -> None:
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot.
Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
parent_snapshot: Ending snapshot on the branch being validated
"""
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries}
raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!")
def _added_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
parent_snapshot: Optional[Snapshot],
) -> Iterator[ManifestEntry]:
"""Return manifest entries for data files added between the starting snapshot and parent snapshot.
Args:
table: Table to get the history from
starting_snapshot: Starting snapshot to get the history from
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
parent_snapshot: Parent snapshot to get the history from
Returns:
Iterator of manifest entries for added data files matching the conditions
"""
if parent_snapshot is None:
return
manifests, snapshot_ids = _validation_history(
table,
parent_snapshot,
starting_snapshot,
VALIDATE_ADDED_DATA_FILES_OPERATIONS,
ManifestContent.DATA,
)
for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io):
if _filter_manifest_entries(entry, snapshot_ids, data_filter, partition_set, None, table.schema()):
yield entry
def _validate_added_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Optional[Snapshot],
) -> None:
"""Validate that no files matching a filter have been added to the table since a starting snapshot.
Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
parent_snapshot: Ending snapshot on the branch being validated
"""
conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")