blob: 8218ab364526223b96cb700137ec7308412d93e0 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Snapshot loader for loading snapshots from a catalog."""
from typing import Optional
class SnapshotLoader:
"""Loader to load latest snapshot from a catalog.
This loader uses a CatalogLoader to create catalog instances and
load snapshots through the catalog's load_snapshot method.
"""
def __init__(self, catalog_loader, identifier):
"""Initialize the loader with a catalog loader and table identifier.
Args:
catalog_loader: The CatalogLoader instance to create catalog
identifier: The table identifier
"""
self.catalog_loader = catalog_loader
self.identifier = identifier
def copy_with_branch(self, branch: str) -> 'SnapshotLoader':
"""Return a new loader whose identifier carries ``branch``.
Mirrors Java ``SnapshotLoaderImpl.copyWithBranch``
(tag/SnapshotLoaderImpl.java).
"""
from pypaimon.common.identifier import Identifier
rebranched = Identifier(
database=self.identifier.get_database_name(),
object=self.identifier.get_table_name(),
branch=branch,
)
return SnapshotLoader(self.catalog_loader, rebranched)
def load(self) -> Optional[str]:
"""Load the latest snapshot from the catalog.
Returns:
The latest snapshot JSON string, or None if not found
Raises:
RuntimeError: If there's an error loading the snapshot
"""
try:
catalog = self.catalog_loader.load()
table_snapshot = catalog.load_snapshot(self.identifier)
if table_snapshot is None:
return None
return table_snapshot.snapshot
except RuntimeError as e:
raise e
except Exception as e:
raise RuntimeError(f"Failed to load snapshot: {e}")