| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import logging |
| from typing import Optional |
| |
| from pypaimon.common.file_io import FileIO |
| |
| logger = logging.getLogger(__name__) |
| from pypaimon.common.json_util import JSON |
| from pypaimon.snapshot.snapshot import Snapshot |
| |
| |
| class SnapshotManager: |
| """Manager for snapshot files using unified FileIO.""" |
| |
| def __init__(self, table): |
| from pypaimon.table.file_store_table import FileStoreTable |
| |
| self.table: FileStoreTable = table |
| self.file_io: FileIO = self.table.file_io |
| snapshot_path = self.table.table_path.rstrip('/') |
| self.snapshot_dir = f"{snapshot_path}/snapshot" |
| self.latest_file = f"{self.snapshot_dir}/LATEST" |
| |
| def get_latest_snapshot(self) -> Optional[Snapshot]: |
| snapshot_json = self.get_latest_snapshot_json() |
| if snapshot_json is None: |
| return None |
| return JSON.from_json(snapshot_json, Snapshot) |
| |
| def get_latest_snapshot_json(self) -> Optional[str]: |
| if not self.file_io.exists(self.latest_file): |
| return None |
| |
| latest_content = self.read_latest_file() |
| latest_snapshot_id = int(latest_content.strip()) |
| |
| snapshot_file = f"{self.snapshot_dir}/snapshot-{latest_snapshot_id}" |
| if not self.file_io.exists(snapshot_file): |
| return None |
| |
| return self.file_io.read_file_utf8(snapshot_file) |
| |
| def read_latest_file(self, max_retries: int = 5): |
| """ |
| Read the latest snapshot ID from LATEST file with retry mechanism. |
| If file doesn't exist or is empty after retries, scan snapshot directory for max ID. |
| """ |
| import re |
| import time |
| |
| # Try to read LATEST file with retries |
| for retry_count in range(max_retries): |
| try: |
| if self.file_io.exists(self.latest_file): |
| content = self.file_io.read_file_utf8(self.latest_file) |
| if content and content.strip(): |
| return content.strip() |
| |
| # File doesn't exist or is empty, wait a bit before retry |
| if retry_count < max_retries - 1: |
| time.sleep(0.001) |
| |
| except Exception: |
| # On exception, wait and retry |
| if retry_count < max_retries - 1: |
| time.sleep(0.001) |
| |
| # List all files in snapshot directory |
| file_infos = self.file_io.list_status(self.snapshot_dir) |
| |
| max_snapshot_id = None |
| snapshot_pattern = re.compile(r'^snapshot-(\d+)$') |
| |
| for file_info in file_infos: |
| # Get filename from path |
| filename = file_info.path.split('/')[-1] |
| match = snapshot_pattern.match(filename) |
| if match: |
| snapshot_id = int(match.group(1)) |
| if max_snapshot_id is None or snapshot_id > max_snapshot_id: |
| max_snapshot_id = snapshot_id |
| |
| if not max_snapshot_id: |
| raise RuntimeError(f"No snapshot content found in {self.snapshot_dir}") |
| |
| return str(max_snapshot_id) |
| |
| def get_snapshot_path(self, snapshot_id: int) -> str: |
| """ |
| Get the path for a snapshot file. |
| |
| Args: |
| snapshot_id: The snapshot ID |
| |
| Returns: |
| Path to the snapshot file |
| """ |
| return f"{self.snapshot_dir}/snapshot-{snapshot_id}" |
| |
| def try_get_earliest_snapshot(self) -> Optional[Snapshot]: |
| earliest_file = f"{self.snapshot_dir}/EARLIEST" |
| if self.file_io.exists(earliest_file): |
| earliest_content = self.file_io.read_file_utf8(earliest_file) |
| earliest_snapshot_id = int(earliest_content.strip()) |
| snapshot = self.get_snapshot_by_id(earliest_snapshot_id) |
| if snapshot is None: |
| logger.warning( |
| "The earliest snapshot or changelog was once identified but disappeared. " |
| "It might have been expired by other jobs operating on this table." |
| ) |
| return snapshot |
| return self.get_snapshot_by_id(1) |
| |
| def earlier_or_equal_time_mills(self, timestamp: int) -> Optional[Snapshot]: |
| """ |
| Find the latest snapshot with time_millis <= the given timestamp. |
| |
| Args: |
| timestamp: The timestamp to compare against |
| |
| Returns: |
| The latest snapshot with time_millis <= timestamp, or None if no such snapshot exists |
| """ |
| earliest = 1 |
| latest = self.get_latest_snapshot().id |
| final_snapshot = None |
| |
| while earliest <= latest: |
| mid = earliest + (latest - earliest) // 2 |
| snapshot = self.get_snapshot_by_id(mid) |
| commit_time = snapshot.time_millis |
| |
| if commit_time > timestamp: |
| latest = mid - 1 |
| elif commit_time < timestamp: |
| earliest = mid + 1 |
| final_snapshot = snapshot |
| else: |
| final_snapshot = snapshot |
| break |
| |
| return final_snapshot |
| |
| def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: |
| """ |
| Get a snapshot by its ID. |
| |
| Args: |
| snapshot_id: The snapshot ID |
| |
| Returns: |
| The snapshot with the specified ID, or None if not found |
| """ |
| snapshot_file = self.get_snapshot_path(snapshot_id) |
| if not self.file_io.exists(snapshot_file): |
| return None |
| |
| snapshot_content = self.file_io.read_file_utf8(snapshot_file) |
| return JSON.from_json(snapshot_content, Snapshot) |