blob: 82ecb9e4931136ccd8f8e574b92f6d0a12539d6c [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest.mock import MagicMock
from uuid import uuid4
import pytest
from pyiceberg.table import CommitTableResponse, Table
def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None:
"""Test that a HEAD (branch) snapshot cannot be expired."""
HEAD_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004
# Mock the catalog's commit_table method
table_v2.catalog = MagicMock()
# Simulate refs protecting HEAD_SNAPSHOT as a branch
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=HEAD_SNAPSHOT, snapshot_ref_type="branch"),
"tag1": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="tag"),
}
}
)
# Assert fixture data
assert any(ref.snapshot_id == HEAD_SNAPSHOT for ref in table_v2.metadata.refs.values())
# Attempt to expire the HEAD snapshot and expect a ValueError
with pytest.raises(ValueError, match=f"Snapshot with ID {HEAD_SNAPSHOT} is protected and cannot be expired."):
table_v2.expire_snapshots().expire_snapshot_by_id(HEAD_SNAPSHOT).commit()
table_v2.catalog.commit_table.assert_not_called()
def test_cannot_expire_tagged_snapshot(table_v2: Table) -> None:
"""Test that a tagged snapshot cannot be expired."""
TAGGED_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004
table_v2.catalog = MagicMock()
# Simulate refs protecting TAGGED_SNAPSHOT as a tag
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"tag1": MagicMock(snapshot_id=TAGGED_SNAPSHOT, snapshot_ref_type="tag"),
"main": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="branch"),
}
}
)
assert any(ref.snapshot_id == TAGGED_SNAPSHOT for ref in table_v2.metadata.refs.values())
with pytest.raises(ValueError, match=f"Snapshot with ID {TAGGED_SNAPSHOT} is protected and cannot be expired."):
table_v2.expire_snapshots().expire_snapshot_by_id(TAGGED_SNAPSHOT).commit()
table_v2.catalog.commit_table.assert_not_called()
def test_expire_unprotected_snapshot(table_v2: Table) -> None:
"""Test that an unprotected snapshot can be expired."""
EXPIRE_SNAPSHOT = 3051729675574597004
KEEP_SNAPSHOT = 3055729675574597004
mock_response = CommitTableResponse(
metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT]}),
metadata_location="mock://metadata/location",
uuid=uuid4(),
)
table_v2.catalog = MagicMock()
table_v2.catalog.commit_table.return_value = mock_response
# Remove any refs that protect the snapshot to be expired
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="branch"),
"tag1": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="tag"),
}
}
)
# Assert fixture data
assert all(ref.snapshot_id != EXPIRE_SNAPSHOT for ref in table_v2.metadata.refs.values())
# Expire the snapshot
table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit()
table_v2.catalog.commit_table.assert_called_once()
remaining_snapshots = table_v2.metadata.snapshots
assert EXPIRE_SNAPSHOT not in remaining_snapshots
assert len(table_v2.metadata.snapshots) == 1
def test_expire_nonexistent_snapshot_raises(table_v2: Table) -> None:
"""Test that trying to expire a non-existent snapshot raises an error."""
NONEXISTENT_SNAPSHOT = 9999999999999999999
table_v2.catalog = MagicMock()
table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}})
with pytest.raises(ValueError, match=f"Snapshot with ID {NONEXISTENT_SNAPSHOT} does not exist."):
table_v2.expire_snapshots().expire_snapshot_by_id(NONEXISTENT_SNAPSHOT).commit()
table_v2.catalog.commit_table.assert_not_called()
def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None:
# Setup: two snapshots; both are old, but one is head/tag protected
HEAD_SNAPSHOT = 3051729675574597004
TAGGED_SNAPSHOT = 3055729675574597004
# Add snapshots to metadata for timestamp/protected test
from types import SimpleNamespace
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=HEAD_SNAPSHOT, snapshot_ref_type="branch"),
"mytag": MagicMock(snapshot_id=TAGGED_SNAPSHOT, snapshot_ref_type="tag"),
},
"snapshots": [
SimpleNamespace(snapshot_id=HEAD_SNAPSHOT, timestamp_ms=1, parent_snapshot_id=None),
SimpleNamespace(snapshot_id=TAGGED_SNAPSHOT, timestamp_ms=1, parent_snapshot_id=None),
],
}
)
table_v2.catalog = MagicMock()
# Attempt to expire all snapshots before a future timestamp (so both are candidates)
future_timestamp = 9999999999999 # Far in the future, after any real snapshot
# Mock the catalog's commit_table to return the current metadata (simulate no change)
mock_response = CommitTableResponse(
metadata=table_v2.metadata, # protected snapshots remain
metadata_location="mock://metadata/location",
uuid=uuid4(),
)
table_v2.catalog.commit_table.return_value = mock_response
table_v2.expire_snapshots().expire_snapshots_older_than(future_timestamp).commit()
# Update metadata to reflect the commit (as in other tests)
table_v2.metadata = mock_response.metadata
# Both protected snapshots should remain
remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots}
assert HEAD_SNAPSHOT in remaining_ids
assert TAGGED_SNAPSHOT in remaining_ids
# No snapshots should have been expired (commit_table called, but with empty snapshot_ids)
args, kwargs = table_v2.catalog.commit_table.call_args
updates = args[2] if len(args) > 2 else ()
# Find RemoveSnapshotsUpdate in updates
remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None)
assert remove_update is not None
assert remove_update.snapshot_ids == []
def test_expire_snapshots_by_ids(table_v2: Table) -> None:
"""Test that multiple unprotected snapshots can be expired by IDs."""
EXPIRE_SNAPSHOT_1 = 3051729675574597004
EXPIRE_SNAPSHOT_2 = 3051729675574597005
KEEP_SNAPSHOT = 3055729675574597004
mock_response = CommitTableResponse(
metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT]}),
metadata_location="mock://metadata/location",
uuid=uuid4(),
)
table_v2.catalog = MagicMock()
table_v2.catalog.commit_table.return_value = mock_response
# Remove any refs that protect the snapshots to be expired
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="branch"),
"tag1": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="tag"),
}
}
)
# Add snapshots to metadata for multi-id test
from types import SimpleNamespace
table_v2.metadata = table_v2.metadata.model_copy(
update={
"refs": {
"main": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="branch"),
"tag1": MagicMock(snapshot_id=KEEP_SNAPSHOT, snapshot_ref_type="tag"),
},
"snapshots": [
SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT_1, timestamp_ms=1, parent_snapshot_id=None),
SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT_2, timestamp_ms=1, parent_snapshot_id=None),
SimpleNamespace(snapshot_id=KEEP_SNAPSHOT, timestamp_ms=2, parent_snapshot_id=None),
],
}
)
# Assert fixture data
assert all(ref.snapshot_id not in (EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2) for ref in table_v2.metadata.refs.values())
# Expire the snapshots
table_v2.expire_snapshots().expire_snapshots_by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit()
table_v2.catalog.commit_table.assert_called_once()
remaining_snapshots = table_v2.metadata.snapshots
assert EXPIRE_SNAPSHOT_1 not in remaining_snapshots
assert EXPIRE_SNAPSHOT_2 not in remaining_snapshots
assert len(table_v2.metadata.snapshots) == 1