blob: 1cffc60f4e565bb0833b09382ec0aff5eacca27a [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List, Optional
from pypaimon.branch.branch_manager import BranchManager
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_exception import (
BranchAlreadyExistException,
BranchNotExistException,
TagNotExistException
)
from pypaimon.catalog.catalog_loader import CatalogLoader
from pypaimon.common.identifier import Identifier
class CatalogBranchManager(BranchManager):
"""
Branch manager for managing branches via catalog.
This implementation delegates branch operations to the underlying catalog,
which handles branch storage and management at the catalog level.
This is useful for catalog-based table access (e.g., REST catalog).
"""
def __init__(self, catalog_loader: CatalogLoader, identifier: Identifier):
"""Initialize CatalogBranchManager.
Args:
catalog_loader: CatalogLoader instance to load catalog
identifier: Identifier for the table
"""
self.catalog_loader = catalog_loader
self.identifier = identifier
def _execute_post(self, func) -> None:
"""Execute a function that doesn't return a value with the catalog.
Args:
func: Function that takes a Catalog as parameter
Raises:
ValueError: If branch or tag operations fail
"""
catalog = self.catalog_loader.load()
try:
func(catalog)
except BranchNotExistException as e:
raise ValueError(f"Branch name '{e.branch}' doesn't exist.")
except TagNotExistException as e:
raise ValueError(f"Tag '{e.tag}' doesn't exist.")
except BranchAlreadyExistException as e:
raise ValueError(f"Branch name '{e.branch}' already exists.")
def _execute_get(self, func):
"""Execute a function that returns a value with the catalog.
Args:
func: Function that takes a Catalog as parameter and returns a value
Returns:
The value returned by the function
"""
catalog = self.catalog_loader.load()
return func(catalog)
def create_branch(
self,
branch_name: str,
tag_name: Optional[str] = None,
ignore_if_exists: bool = False
) -> None:
"""
Create a branch from the current state or from a tag.
Args:
branch_name: Name of the branch to create
tag_name: Optional tag name to create branch from, None for current state
ignore_if_exists: If true, do nothing when branch already exists
Raises:
ValueError: If branch name is invalid or branch already exists (and ignore_if_exists=False)
"""
def _create(catalog: Catalog):
BranchManager.validate_branch(branch_name)
existing_branches = catalog.list_branches(self.identifier)
if branch_name in existing_branches:
if not ignore_if_exists:
raise ValueError(f"Branch name '{branch_name}' already exists.")
return
catalog.create_branch(self.identifier, branch_name, tag_name)
self._execute_post(_create)
def drop_branch(self, branch_name: str) -> None:
"""
Drop a branch.
Args:
branch_name: Name of the branch to drop
Raises:
ValueError: If branch doesn't exist
"""
def _drop(catalog: Catalog):
catalog.drop_branch(self.identifier, branch_name)
self._execute_post(_drop)
def rename_branch(self, from_branch: str, to_branch: str) -> None:
"""
Rename a branch.
Args:
from_branch: Current name of the branch
to_branch: New name for the branch
Raises:
ValueError: If from_branch or to_branch is invalid
"""
def _rename(catalog: Catalog):
catalog.rename_branch(self.identifier, from_branch, to_branch)
self._execute_post(_rename)
def fast_forward(self, branch_name: str) -> None:
"""
Fast forward the current branch to the specified branch.
Args:
branch_name: The branch to fast forward to
Raises:
ValueError: If fast-forward parameters are invalid
"""
def _fast_forward(catalog: Catalog):
current_branch = self.identifier.get_branch_name_or_default()
BranchManager.fast_forward_validate(branch_name, current_branch)
catalog.fast_forward(self.identifier, branch_name)
self._execute_post(_fast_forward)
def branches(self) -> List[str]:
"""
List all branches.
Returns:
List of branch names
"""
def _list(catalog: Catalog):
return catalog.list_branches(self.identifier)
return self._execute_get(_list)