blob: c06139de1b3a23368f55a98fdb5286694a477898 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from urllib.parse import urlparse
from pypaimon.common.identifier import Identifier
from pypaimon.common.options.config import FuseOptions
from pypaimon.filesystem.local_file_io import LocalFileIO, FuseLocalFileIO
logger = logging.getLogger(__name__)
class FusePathResolver:
"""Resolves FUSE local paths and validates FUSE mount availability."""
def __init__(self, options, rest_api):
self.fuse_root = options.get(FuseOptions.FUSE_ROOT)
self.validation_mode = options.get(FuseOptions.FUSE_VALIDATION_MODE, "strict")
self.fuse_mode = options.get(FuseOptions.FUSE_MODE, "pvfs")
self._validation_state = None # None=not validated, True=passed, False=failed
self._rest_api = rest_api
self._options = options
def resolve_local_path(self, original_path, identifier=None):
"""
Resolve FUSE local path.
In 'pvfs' mode, use database/table logical names from identifier to build the path.
If identifier has no object name, returns database-level path (used for validation).
In 'raw' mode, use URI path segments directly.
Returns:
Local path
Raises:
ValueError: If fuse.root is not configured or pvfs mode missing identifier
"""
if not self.fuse_root:
raise ValueError(
"FUSE local path is enabled but fuse.root is not configured"
)
root = self.fuse_root.rstrip('/')
if self.fuse_mode == "pvfs":
if identifier is None:
raise ValueError(
"FUSE path mode 'pvfs' requires an Identifier to resolve "
"the local path, but identifier is None."
)
db = identifier.get_database_name()
obj = identifier.get_object_name()
if obj:
return "{}/{}/{}".format(root, db, obj)
return "{}/{}".format(root, db)
elif self.fuse_mode == "raw":
# raw mode: use URI path segments directly
uri = urlparse(original_path)
path_part = uri.path.lstrip('/')
if not uri.scheme:
# No scheme means path like "catalog/db/table",
# skip the first segment (catalog name) to align with scheme-based paths
segments = path_part.split('/')
if len(segments) > 1:
path_part = '/'.join(segments[1:])
return "{}/{}".format(root, path_part)
else:
raise ValueError(
"Invalid fuse.mode: '{}'. "
"Supported modes are 'pvfs' and 'raw'.".format(self.fuse_mode)
)
def validate(self):
"""
Validate FUSE local path is correctly mounted.
Get default database's location, convert to local path and check if it exists.
"""
if self.validation_mode == "none":
self._validation_state = True
return
# Get default database details, API call failure raises exception directly
db = self._rest_api.get_database("default")
remote_location = db.location
if not remote_location:
logger.info("Default database has no location, skipping FUSE validation")
self._validation_state = True
return
expected_local = self.resolve_local_path(
remote_location, Identifier.create("default", None)
)
local_file_io = LocalFileIO(expected_local, self._options)
# Only validate if local path exists, handle based on validation mode
if not local_file_io.exists(expected_local):
error_msg = (
"FUSE local path validation failed: "
"local path '{}' does not exist "
"for default database location '{}'".format(expected_local, remote_location)
)
self._handle_validation_error(error_msg)
else:
self._validation_state = True
logger.info("FUSE local path validation passed")
def _handle_validation_error(self, error_msg):
"""Handle validation error based on validation mode."""
if self.validation_mode == "strict":
raise ValueError(error_msg)
elif self.validation_mode == "warn":
logger.warning("%s. Falling back to default FileIO.", error_msg)
self._validation_state = False # Mark validation failed, fallback to default FileIO
def get_file_io(self, table_path, identifier, data_token_enabled,
rest_token_file_io_factory, default_file_io_factory):
"""
Get FileIO for data access, supporting FUSE local path mapping.
Args:
table_path: The remote table path
identifier: Table identifier
data_token_enabled: Whether data token is enabled
rest_token_file_io_factory: Factory callable for RESTTokenFileIO
default_file_io_factory: Factory callable for default FileIO
Returns:
FileIO instance (FuseLocalFileIO or fallback)
"""
# Configuration error raises exception directly
local_path = self.resolve_local_path(table_path, identifier)
# Perform validation (only once)
if self._validation_state is None:
self.validate()
# Validation passed, return FUSE-aware local FileIO
if self._validation_state:
return FuseLocalFileIO(
path=table_path.rstrip('/'),
fuse_path=local_path.rstrip('/'),
catalog_options=self._options,
)
# warn mode validation failed, fallback to default FileIO
if data_token_enabled:
return rest_token_file_io_factory()
return default_file_io_factory()
@property
def validation_state(self):
return self._validation_state
@validation_state.setter
def validation_state(self, value):
self._validation_state = value