blob: f7fe2c45bcb7931f948e0982446ecc8d929efff7 [file]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
from pypaimon.common.json_util import JSON
logger = logging.getLogger(__name__)
class RollbackHelper:
"""Helper class for table rollback including utils to clean snapshots.
"""
def __init__(self, snapshot_manager, tag_manager, file_io):
"""Initialize RollbackHelper.
Args:
snapshot_manager: The SnapshotManager instance.
tag_manager: The TagManager instance.
file_io: The FileIO instance for file operations.
"""
self._snapshot_manager = snapshot_manager
self._tag_manager = tag_manager
self._file_io = file_io
def clean_larger_than(self, retained_snapshot):
"""Clean snapshots and tags whose id is larger than the retained snapshot.
Updates the LATEST hint and removes snapshot files and tag files
for snapshots newer than the retained one.
Args:
retained_snapshot: The snapshot to retain; everything newer is removed.
"""
latest = self._snapshot_manager.get_latest_snapshot()
if latest is None:
return
latest_id = latest.id
retained_id = retained_snapshot.id
# Update LATEST hint
try:
# Try atomic write first
success = self._file_io.try_to_write_atomic(self._snapshot_manager.latest_file, str(retained_id))
if not success:
# Fallback to regular write
self._file_io.write_file(self._snapshot_manager.latest_file, str(retained_id), overwrite=True)
except Exception as e:
logger.warning("Failed to update LATEST hint: %s", e)
# Delete snapshot files larger than retained
for snapshot_id in range(retained_id + 1, latest_id + 1):
snapshot_path = self._snapshot_manager.get_snapshot_path(snapshot_id)
if self._file_io.exists(snapshot_path):
self._file_io.delete(snapshot_path)
# Clean tags whose snapshot id is larger than retained
for tag_name in self._tag_manager.list_tags():
tag = self._tag_manager.get(tag_name)
if tag is not None:
tag_snapshot = tag.trim_to_snapshot()
if tag_snapshot.id > retained_id:
self._tag_manager.delete_tag(tag_name)
def create_snapshot_file_if_needed(self, tagged_snapshot):
"""Create a snapshot file from a tag if the snapshot file doesn't exist.
When rolling back to a tag, the snapshot file may have been expired.
This method recreates it from the tag data and updates the earliest hint.
Args:
tagged_snapshot: The snapshot from the tag to potentially write.
"""
snapshot_path = self._snapshot_manager.get_snapshot_path(tagged_snapshot.id)
if not self._file_io.exists(snapshot_path):
self._file_io.write_file(
snapshot_path, JSON.to_json(tagged_snapshot), overwrite=False)