blob: 45fe6e4c43d95573491fbbfb90d35f0b7fab79bc [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
from typing import List, Optional
from pypaimon.branch.branch_manager import BranchManager, BRANCH_PREFIX
from pypaimon.common.file_io import FileIO
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.tag.tag_manager import TagManager
logger = logging.getLogger(__name__)
class FileSystemBranchManager(BranchManager):
"""A BranchManager implementation to manage branches via file system."""
def __init__(
self,
file_io: FileIO,
table_path: str,
snapshot_manager: SnapshotManager,
tag_manager: TagManager,
schema_manager: SchemaManager,
current_branch: str = "main"
):
"""
Initialize FileSystemBranchManager.
Args:
file_io: FileIO instance for file operations
table_path: Path to the table root directory
snapshot_manager: SnapshotManager instance
tag_manager: TagManager instance
schema_manager: SchemaManager instance
current_branch: Current branch name
"""
self.file_io = file_io
self.table_path = table_path.rstrip('/')
self.snapshot_manager = snapshot_manager
self.tag_manager = tag_manager
self.schema_manager = schema_manager
self.current_branch = self._normalize_branch(current_branch)
@staticmethod
def _normalize_branch(branch: str) -> str:
"""Normalize branch name."""
if not branch or branch == "main":
return "main"
return branch
def _branch_directory(self) -> str:
"""Return the root directory of branch."""
return f"{self.table_path}/branch"
def branch_path(self, branch_name: str) -> str:
"""
Return the path of a branch.
Args:
branch_name: Name of the branch
Returns:
The path to the branch
"""
return BranchManager.branch_path(self.table_path, branch_name)
def _copy_with_branch(self, manager, branch: str):
"""
Return a new manager whose path accessors land under
``{table_path}/branch/branch-{name}``.
Equivalent to the per-manager rebranch calls inlined in Java
``FileSystemBranchManager.createBranch(name, tagName)`` (e.g.
``snapshotManager.copyWithBranch(branchName)``). ``TagManager``
is rebuilt directly because the Python ``TagManager`` has no
``copy_with_branch`` factory yet.
"""
if isinstance(manager, TagManager):
return TagManager(self.file_io, self.table_path, branch)
elif isinstance(manager, SchemaManager):
return manager.copy_with_branch(branch)
elif isinstance(manager, SnapshotManager):
return manager.copy_with_branch(branch)
else:
return manager
def _file_exists(self, path: str) -> bool:
"""
Check if a file or directory exists.
Args:
path: The path to check
Returns:
True if path exists, False otherwise
"""
return self.file_io.exists(path)
def create_branch(
self,
branch_name: str,
tag_name: Optional[str] = None,
ignore_if_exists: bool = False
) -> None:
"""
Create a branch from the current state or from a tag.
Args:
branch_name: Name of the branch to create
tag_name: Optional tag name to create branch from, None for current state
ignore_if_exists: If true, do nothing when branch already exists
"""
if ignore_if_exists and self.branch_exists(branch_name):
return
self._validate_branch(branch_name)
if tag_name is None:
# Create branch from current state
try:
latest_schema = self.schema_manager.latest()
if latest_schema:
self._copy_schemas_to_branch(branch_name, latest_schema.id)
else:
raise RuntimeError("No schema found for creating branch")
except Exception as e:
raise RuntimeError(
f"Exception occurs when create branch '{branch_name}' "
f"(directory in {self.branch_path(branch_name)})."
) from e
else:
# Create branch from tag
tag = self.tag_manager.get_or_throw(tag_name)
snapshot = tag.trim_to_snapshot()
try:
branch_tag_manager = TagManager(self.file_io, self.table_path, branch_name)
branch_snapshot_manager = self._copy_with_branch(
self.snapshot_manager, branch_name
)
# Copy the corresponding tag, snapshot and schema files into the branch directory
self.file_io.copy_file(
self.tag_manager.tag_path(tag_name),
branch_tag_manager.tag_path(tag_name),
overwrite=True
)
self.file_io.copy_file(
self.snapshot_manager.get_snapshot_path(snapshot.id),
branch_snapshot_manager.get_snapshot_path(snapshot.id),
overwrite=True
)
self._copy_schemas_to_branch(branch_name, snapshot.schema_id)
except Exception as e:
raise RuntimeError(
f"Exception occurs when create branch '{branch_name}' "
f"(directory in {self.branch_path(branch_name)})."
) from e
def drop_branch(self, branch_name: str) -> None:
"""
Drop a branch.
Args:
branch_name: Name of the branch to drop
Raises:
ValueError: If branch doesn't exist
"""
if not self.branch_exists(branch_name):
raise ValueError(f"Branch name '{branch_name}' doesn't exist.")
try:
# Delete branch directory recursively
branch_path = self.branch_path(branch_name)
self.file_io.delete(branch_path, recursive=True)
except Exception as e:
logger.warning(
f"Deleting the branch failed due to an exception in deleting the directory "
f"{self.branch_path(branch_name)}. Please try again."
)
raise RuntimeError(f"Failed to delete branch '{branch_name}'") from e
def rename_branch(self, from_branch: str, to_branch: str) -> None:
"""
Rename a branch.
Args:
from_branch: Current name of the branch
to_branch: New name for the branch
Raises:
ValueError: If from_branch or to_branch is blank, from_branch doesn't exist,
to_branch already exists, or trying to rename the main branch
RuntimeError: If the rename operation fails
"""
# Check if from_branch is main branch
if self.is_main_branch(from_branch):
raise ValueError(f"Cannot rename the main branch '{from_branch}'.")
# Check if from_branch is blank
if not from_branch or from_branch.isspace():
raise ValueError("Source branch name shouldn't be blank.")
# Check if to_branch is blank
if not to_branch or to_branch.isspace():
raise ValueError("Target branch name shouldn't be blank.")
# Validate the new branch name
try:
self.validate_branch(to_branch)
except ValueError as e:
raise ValueError(f"Invalid target branch name: {e}")
# Check if from_branch exists
if not self.branch_exists(from_branch):
raise ValueError(f"Source branch '{from_branch}' doesn't exist.")
# Check if to_branch already exists
if self.branch_exists(to_branch):
raise ValueError(f"Target branch '{to_branch}' already exists.")
try:
# Rename the branch directory
from_path = self.branch_path(from_branch)
to_path = self.branch_path(to_branch)
self.file_io.rename(from_path, to_path)
logger.info(f"Successfully renamed branch from '{from_branch}' to '{to_branch}'")
except Exception as e:
logger.warning(
f"Renaming branch from '{from_branch}' to '{to_branch}' failed due to an exception. "
f"Please try again."
)
raise RuntimeError(
f"Failed to rename branch from '{from_branch}' to '{to_branch}'"
) from e
def fast_forward(self, branch_name: str) -> None:
"""
Fast forward the current branch to the specified branch.
Args:
branch_name: The branch to fast forward to
Raises:
ValueError: If fast-forward parameters are invalid
RuntimeError: If fast-forward operation fails
"""
BranchManager.fast_forward_validate(branch_name, self.current_branch)
if not self.branch_exists(branch_name):
raise ValueError(f"Branch name '{branch_name}' doesn't exist.")
try:
branch_snapshot_manager = self._copy_with_branch(
self.snapshot_manager, branch_name
)
earliest_snapshot_id = branch_snapshot_manager.try_get_earliest_snapshot()
if earliest_snapshot_id is None:
raise RuntimeError(
f"Cannot fast forward branch {branch_name}, "
"because it does not have snapshot."
)
# Delete snapshot, schema, and tag from the main branch which occurs after
# earliestSnapshotId
# Note: This is a simplified version - full implementation would need
# to properly handle these deletions
# Copy files from branch to main
self.file_io.copy_files(
branch_snapshot_manager.snapshot_dir,
self.snapshot_manager.snapshot_dir,
overwrite=True
)
# Invalidate cache if applicable
if hasattr(self.snapshot_manager, 'invalidate_cache'):
self.snapshot_manager.invalidate_cache()
except Exception as e:
raise RuntimeError(
f"Exception occurs when fast forward '{branch_name}' "
f"(directory in {self.branch_path(branch_name)})."
) from e
def branch_exists(self, branch_name: str) -> bool:
"""
Check if a branch exists.
Args:
branch_name: Name of the branch to check
Returns:
True if branch exists, False otherwise
"""
branch_path = self.branch_path(branch_name)
return self._file_exists(branch_path)
def _validate_branch(self, branch_name: str) -> None:
"""
Validate branch name and check if it already exists.
Args:
branch_name: The branch name to validate
Raises:
ValueError: If branch name is invalid or already exists
"""
BranchManager.validate_branch(branch_name)
if self.branch_exists(branch_name):
raise ValueError(f"Branch name '{branch_name}' already exists.")
def branches(self) -> List[str]:
"""
List all branches.
Returns:
List of branch names
"""
branch_dir = self._branch_directory()
result = []
try:
if not self._file_exists(branch_dir):
return result
file_infos = self.file_io.list_status(branch_dir)
if file_infos is None:
return result
for file_info in file_infos:
# Get directory name
dir_name = None
if hasattr(file_info, 'base_name'):
dir_name = file_info.base_name
else:
try:
dir_name = os.path.basename(file_info.path)
except (AttributeError, TypeError):
dir_name = None
if dir_name and dir_name.startswith(BRANCH_PREFIX):
branch_name = dir_name[len(BRANCH_PREFIX):]
result.append(branch_name)
except Exception as e:
raise RuntimeError(f"Failed to list branches: {e}") from e
return result
def _copy_schemas_to_branch(self, branch_name: str, schema_id: int) -> None:
"""
Copy schemas to the specified branch.
Args:
branch_name: The target branch name
schema_id: The maximum schema id to copy
"""
branch_schema_manager = SchemaManager(self.file_io, self.branch_path(branch_name))
for i in range(schema_id + 1):
schema = self.schema_manager.get_schema(i)
if schema is not None:
schema_path = self.schema_manager._to_schema_path(i)
branch_schema_path = branch_schema_manager._to_schema_path(i)
self.file_io.copy_file(schema_path, branch_schema_path, overwrite=True)