blob: 15f9f9f8dcd039bf19891cdaf1704157b96893bf [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import importlib
import time
from abc import ABC
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional, Tuple
import fsspec
from cachetools import LRUCache, TTLCache
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from readerwriterlock import rwlock
from pypaimon.api.api_response import GetTableResponse, GetTableTokenResponse
from pypaimon.api.client import AlreadyExistsException, NoSuchResourceException
from pypaimon.api.rest_api import RESTApi
from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
PROTOCOL_NAME = "pvfs"
class StorageType(Enum):
LOCAL = "file"
OSS = "oss"
class PVFSIdentifier(ABC):
catalog: str
endpoint: str
def get_cache_key(self) -> str:
return "{}.{}".format(self.catalog, self.__remove_endpoint_schema(self.endpoint))
@staticmethod
def __remove_endpoint_schema(url):
if url.startswith('https://'):
return url[8:]
elif url.startswith('http://'):
return url[7:]
return url
@dataclass
class PVFSCatalogIdentifier(PVFSIdentifier):
catalog: str
endpoint: str
@dataclass
class PVFSDatabaseIdentifier(PVFSIdentifier):
catalog: str
endpoint: str
database: str
@dataclass
class PVFSTableIdentifier(PVFSIdentifier):
catalog: str
endpoint: str
database: str
table: str
sub_path: str = None
def __hash__(self) -> int:
return hash((self.catalog, self.database, self.table))
def __eq__(self, __value: Any) -> bool:
if isinstance(__value, PVFSTableIdentifier):
return self.catalog == __value.catalog and self.database == __value.database and self.table == __value.table
return False
def get_actual_path(self, storage_location: str):
if self.sub_path:
return '{}/{}'.format(storage_location.rstrip("/"), self.sub_path.lstrip("/"))
return storage_location
def get_virtual_location(self):
return ('{}://{}'.format(PROTOCOL_NAME, self.catalog) +
'/{}/{}'.format(self.database, self.table))
def get_identifier(self):
return Identifier.create(self.database, self.table)
def name(self):
return '{}.{}.{}'.format(self.catalog, self.database, self.table)
@dataclass
class PaimonRealStorage:
TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
token: Dict[str, str]
expires_at_millis: Optional[int]
file_system: AbstractFileSystem
def need_refresh(self) -> bool:
if self.expires_at_millis is not None:
return self.expires_at_millis - int(time.time() * 1000) < self.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
return False
@dataclass
class TableStore:
path: str
created: datetime
modified: datetime
class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
options: Dict[str, Any]
protocol = PROTOCOL_NAME
def __init__(self, options: Dict = None, **kwargs):
options.update({CatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'})
self.options = options
self.warehouse = options.get(CatalogOptions.WAREHOUSE)
cache_expired_time = (
PVFSOptions.DEFAULT_TABLE_CACHE_TTL
if options is None
else options.get(
PVFSOptions.TABLE_CACHE_TTL, PVFSOptions.DEFAULT_TABLE_CACHE_TTL
)
)
self._cache_enable = options.get(PVFSOptions.CACHE_ENABLED, True)
if self._cache_enable:
self._table_cache = TTLCache(maxsize=PVFSOptions.DEFAULT_CACHE_SIZE, ttl=cache_expired_time)
self._table_cache_lock = rwlock.RWLockFair()
self._rest_api_cache = LRUCache(PVFSOptions.DEFAULT_CACHE_SIZE)
self._fs_cache = LRUCache(maxsize=PVFSOptions.DEFAULT_CACHE_SIZE)
self._table_cache_lock = rwlock.RWLockFair()
self._rest_api_cache_lock = rwlock.RWLockFair()
self._fs_cache_lock = rwlock.RWLockFair()
super().__init__(**kwargs)
@property
def fsid(self):
return PROTOCOL_NAME
def sign(self, path, expiration=None, **kwargs):
"""We do not support to create a signed URL representing the given path in gvfs."""
raise Exception(
"Sign is not implemented for Paimon Virtual FileSystem."
)
def ls(self, path, detail=True, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
databases = rest_api.list_databases()
if detail:
return [
self._create_dir_detail(
self._convert_database_virtual_path(pvfs_identifier.catalog, database)
)
for database in databases
]
return [
self._convert_database_virtual_path(pvfs_identifier.catalog, database)
for database in databases
]
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
tables = rest_api.list_tables(pvfs_identifier.database)
if detail:
return [
self._create_dir_detail(
self._convert_table_virtual_path(pvfs_identifier.catalog, pvfs_identifier.database, table)
)
for table in tables
]
return [
self._convert_table_virtual_path(pvfs_identifier.catalog, pvfs_identifier.database, table)
for table in tables
]
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
table_path = self._get_table_store(rest_api, pvfs_identifier).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
virtual_location = pvfs_identifier.get_virtual_location()
fs = self._get_filesystem(pvfs_identifier, storage_type)
entries = fs.ls(actual_path, detail=detail, **kwargs)
if detail:
virtual_entities = [
self._convert_actual_info(entry, storage_type, storage_location, virtual_location)
for entry in entries
]
return virtual_entities
else:
virtual_entry_paths = [
self._convert_actual_path(
storage_type, entry_path, storage_location, virtual_location
)
for entry_path in entries
]
return virtual_entry_paths
def info(self, path, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
return self._create_dir_detail('{}://{}'.format(PROTOCOL_NAME, pvfs_identifier.catalog))
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
return self._create_dir_detail(
self._convert_database_virtual_path(pvfs_identifier.catalog, pvfs_identifier.database)
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
table_path = self._get_table_store(rest_api, pvfs_identifier).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
virtual_location = pvfs_identifier.get_virtual_location()
fs = self._get_filesystem(pvfs_identifier, storage_type)
entry = fs.info(actual_path)
return self._convert_actual_info(entry, storage_type, storage_location, virtual_location)
def exists(self, path, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
return True
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
try:
rest_api.get_database(pvfs_identifier.database)
return True
except NoSuchResourceException:
return False
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
try:
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if table_path is not None and pvfs_identifier.sub_path is None:
return True
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.exists(actual_path)
except NoSuchResourceException:
return False
def cp_file(self, path1, path2, **kwargs):
source = self._extract_pvfs_identifier(path1)
target = self._extract_pvfs_identifier(path2)
if ((isinstance(source, PVFSTableIdentifier)
and isinstance(target, PVFSTableIdentifier))
and target.sub_path is not None
and source.sub_path is not None
and source == target):
rest_api = self.__rest_api(source)
table_path = self._get_table_store(rest_api, source).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
source_actual_path = source.get_actual_path(storage_location)
target_actual_path = target.get_actual_path(storage_location)
fs = self._get_filesystem(source, storage_type)
fs.cp_file(
self._strip_storage_protocol(storage_type, source_actual_path),
self._strip_storage_protocol(storage_type, target_actual_path),
)
return None
raise Exception("cp is not supported for path: {} to path: {}".format(path1, path2))
def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
source = self._extract_pvfs_identifier(path1)
target = self._extract_pvfs_identifier(path2)
if (isinstance(source, PVFSTableIdentifier) and
isinstance(target, PVFSTableIdentifier) and
target.catalog == source.catalog):
rest_api = self.__rest_api(source)
if target.sub_path is None and source.sub_path is None:
source_identifier = Identifier.create(source.database, source.table)
target_identifier = Identifier.create(target.database, target.table)
rest_api.rename_table(source_identifier, target_identifier)
return None
elif target.sub_path is not None and source.sub_path is not None and target == source:
table_path = self._get_table_store(rest_api, source).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
source_actual_path = source.get_actual_path(storage_location)
target_actual_path = target.get_actual_path(storage_location)
fs = self._get_filesystem(source, storage_type)
if storage_type == StorageType.LOCAL:
fs.mv(
self._strip_storage_protocol(storage_type, source_actual_path),
self._strip_storage_protocol(storage_type, target_actual_path),
recursive=recursive,
maxdepth=maxdepth
)
else:
fs.mv(
self._strip_storage_protocol(storage_type, source_actual_path),
self._strip_storage_protocol(storage_type, target_actual_path),
)
return None
raise Exception("Mv is not supported for path: {} to path: {}".format(path1, path2))
def rm(self, path, recursive=False, maxdepth=None):
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
database_name = pvfs_identifier.database
if not recursive and len(rest_api.list_tables(database_name)) > 0:
raise Exception('Recursive is False but database is not empty')
rest_api.drop_database(database_name)
if self._table_cache:
for table in rest_api.list_tables(database_name):
table_pvfs_identifier = PVFSTableIdentifier(
pvfs_identifier.catalog, database_name, table
).name()
self._table_cache.pop(table_pvfs_identifier)
return True
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
table_identifier = pvfs_identifier.get_identifier()
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if pvfs_identifier.sub_path is None:
rest_api.drop_table(table_identifier)
self._table_cache.pop(table_identifier.name())
return True
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.rm(
self._strip_storage_protocol(storage_type, actual_path),
recursive,
maxdepth,
)
raise Exception(
"Rm is not supported for path: {}.".format(path)
)
def rm_file(self, path):
pvfs_identifier = self._extract_pvfs_identifier(path)
if isinstance(pvfs_identifier, PVFSTableIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if pvfs_identifier.sub_path is not None:
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.rm_file(
self._strip_storage_protocol(storage_type, actual_path),
)
raise Exception("Rm is not supported for path: {}.".format(path))
def rmdir(self, path):
files = self.ls(path)
if len(files) == 0:
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
database_name = pvfs_identifier.database
rest_api.drop_database(database_name)
return True
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
table_identifier = pvfs_identifier.get_identifier()
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if pvfs_identifier.sub_path is None:
rest_api.drop_table(table_identifier)
self._cache.pop(pvfs_identifier)
return True
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.rmdir(
self._strip_storage_protocol(storage_type, actual_path)
)
raise Exception("Rm dir is not supported for path: {}.".format(path))
else:
raise Exception("Rm dir is not supported for path: {} as it is not empty.".format(path))
def open(
self,
path,
mode="rb",
block_size=None,
cache_options=None,
compression=None,
**kwargs
):
pvfs_identifier = self._extract_pvfs_identifier(path)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception("open is not supported for path: {}".format(path))
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
raise Exception(
"open is not supported for path: {}".format(path)
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if pvfs_identifier.sub_path is None:
raise Exception("open is not supported for path: {}".format(path))
else:
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.open(
self._strip_storage_protocol(storage_type, actual_path),
mode,
block_size,
cache_options,
compression,
**kwargs
)
def mkdir(self, path, create_parents=True, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception("mkdir is not supported for path: {}".format(path))
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
rest_api.create_database(pvfs_identifier.database, {})
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
table_path: str
if pvfs_identifier.sub_path is None:
if create_parents:
try:
rest_api.create_database(pvfs_identifier.database, {})
except AlreadyExistsException:
pass
self._create_object_table(pvfs_identifier)
else:
if create_parents:
try:
rest_api.create_database(pvfs_identifier.database, {})
except AlreadyExistsException:
pass
try:
table_path = self._get_table_store(rest_api, pvfs_identifier).path
except NoSuchResourceException:
try:
self._create_object_table(pvfs_identifier)
except AlreadyExistsException:
pass
finally:
table_path = self._get_table_store(rest_api, pvfs_identifier).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.mkdir(
self._strip_storage_protocol(storage_type, actual_path),
create_parents,
**kwargs
)
def makedirs(self, path, exist_ok=True):
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception("makedirs is not supported for path: {}".format(path))
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
try:
rest_api.create_database(pvfs_identifier.catalog, {})
except AlreadyExistsException as e:
if exist_ok:
pass
raise e
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
if pvfs_identifier.sub_path is None:
try:
self._create_object_table(pvfs_identifier)
except AlreadyExistsException as e:
if exist_ok:
pass
raise e
else:
try:
self._create_object_table(pvfs_identifier)
except AlreadyExistsException as e:
if exist_ok:
pass
else:
raise e
table_path = self._get_table_store(rest_api, pvfs_identifier).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.makedirs(
self._strip_storage_protocol(storage_type, actual_path),
exist_ok
)
def created(self, path):
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception("created is not supported for path: {}".format(path))
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
return self.__converse_ts_to_datatime(rest_api.get_database(pvfs_identifier.database).created_at)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
if pvfs_identifier.sub_path is None:
return self._get_table_store(rest_api, pvfs_identifier).created
else:
table = self._get_table_store(rest_api, pvfs_identifier)
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.created(
self._strip_storage_protocol(storage_type, actual_path)
)
def modified(self, path):
pvfs_identifier = self._extract_pvfs_identifier(path)
rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception("modified is not supported for path: {}".format(path))
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
return self.__converse_ts_to_datatime(rest_api.get_database(pvfs_identifier.database).updated_at)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
table_identifier = pvfs_identifier.get_identifier()
if pvfs_identifier.sub_path is None:
return self._get_table_store(rest_api, pvfs_identifier).modified
else:
table = rest_api.get_table(table_identifier)
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.modified(
self._strip_storage_protocol(storage_type, actual_path)
)
def cat_file(self, path, start=None, end=None, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception(
"cat file is not supported for path: {}".format(path)
)
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
raise Exception(
"cat file is not supported for path: {}".format(path)
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
if pvfs_identifier.sub_path is None:
raise Exception(
"cat file is not supported for path: {}".format(path)
)
else:
rest_api = self.__rest_api(pvfs_identifier)
table = self._get_table_store(rest_api, pvfs_identifier)
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.cat_file(
self._strip_storage_protocol(storage_type, actual_path),
start,
end,
**kwargs,
)
def get_file(self, rpath, lpath, callback=None, outfile=None, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(rpath)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception(
"get file is not supported for path: {}".format(rpath)
)
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
raise Exception(
"get file is not supported for path: {}".format(rpath)
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
if pvfs_identifier.sub_path is None:
raise Exception(
"get file is not supported for path: {}".format(rpath)
)
else:
table = self._get_table_store(rest_api, pvfs_identifier)
storage_type = self._get_storage_type(table.path)
storage_location = table.path
actual_path = pvfs_identifier.get_actual_path(storage_location)
fs = self._get_filesystem(pvfs_identifier, storage_type)
return fs.get_file(
self._strip_storage_protocol(storage_type, actual_path),
lpath,
**kwargs
)
def _rm(self, path):
raise Exception(
"_rm is not implemented for Paimon Virtual FileSystem."
)
def _create_object_table(self, pvfs_identifier: PVFSTableIdentifier):
rest_api = self.__rest_api(pvfs_identifier)
schema = Schema(options={'type': 'object-table'})
table_identifier = pvfs_identifier.get_identifier()
rest_api.create_table(table_identifier, schema)
@staticmethod
def __converse_ts_to_datatime(ts: int):
return datetime.datetime.fromtimestamp(ts / 1000, tz=datetime.timezone.utc)
@staticmethod
def _strip_storage_protocol(storage_type: StorageType, path: str):
if storage_type == StorageType.LOCAL:
return path[len("{}:".format(StorageType.LOCAL.value)):]
# OSS has different behavior than S3 and GCS, if we do not remove the
# protocol, it will always return an empty array.
if storage_type == StorageType.OSS:
if path.startswith("{}://".format(StorageType.OSS.value)):
return path[len("{}://".format(StorageType.OSS.value)):]
return path
raise Exception(
"Storage type:{} doesn't support now.".format(storage_type)
)
@staticmethod
def _convert_actual_info(
entry: Dict,
storage_type: StorageType,
storage_location: str,
virtual_location: str,
):
path = PaimonVirtualFileSystem._convert_actual_path(storage_type, entry["name"], storage_location,
virtual_location)
if "mtime" in entry:
# HDFS and GCS
return PaimonVirtualFileSystem._create_file_detail(path, entry["size"], entry["type"], entry["mtime"])
elif "LastModified" in entry:
# S3 and OSS
return PaimonVirtualFileSystem._create_file_detail(path, entry["size"], entry["type"],
entry["LastModified"])
# Unknown
return PaimonVirtualFileSystem._create_file_detail(path, entry["size"], entry["type"])
@staticmethod
def _create_dir_detail(path: str) -> Dict[str, Any]:
return PaimonVirtualFileSystem._create_file_detail(path, 0, 'directory', None)
@staticmethod
def _create_file_detail(name: str, size: int, type: str, mtime: int = None) -> Dict[str, Any]:
return {
"name": name,
"size": size,
"type": type,
"mtime": mtime,
}
@staticmethod
def _convert_database_virtual_path(
catalog_name: str,
database_name: str
):
return '{}://{}/{}'.format(PROTOCOL_NAME, catalog_name, database_name)
@staticmethod
def _convert_table_virtual_path(
catalog_name: str,
database_name: str,
table_name: str
):
return '{}://{}/{}/{}'.format(PROTOCOL_NAME, catalog_name, database_name, table_name)
@staticmethod
def _convert_actual_path(
storage_type: StorageType,
actual_path: str,
storage_location: str,
virtual_location: str,
):
actual_path = PaimonVirtualFileSystem._get_path_without_schema(storage_type, actual_path)
storage_location = PaimonVirtualFileSystem._get_path_without_schema(storage_type, storage_location)
normalized_pvfs = virtual_location.rstrip('/')
sub_location = actual_path[len(storage_location):].lstrip("/")
if len(sub_location) == 0:
return normalized_pvfs
else:
return '{}/{}'.format(normalized_pvfs, sub_location)
@staticmethod
def _get_path_without_schema(storage_type: StorageType, path: str) -> str:
if storage_type == StorageType.LOCAL and path.startswith(StorageType.LOCAL.value):
return path[len("{}://".format(StorageType.LOCAL.value)):]
elif storage_type == StorageType.OSS and path.startswith(StorageType.OSS.value):
return path[len("{}://".format(StorageType.OSS.value)):]
return path
def _extract_pvfs_identifier(self, path: str) -> Optional['PVFSIdentifier']:
if not isinstance(path, str):
raise Exception("path is not a string")
path_without_protocol = path
if path.startswith('{}://'.format(PROTOCOL_NAME)):
path_without_protocol = path[7:]
if not path_without_protocol:
return None
components = [component for component in path_without_protocol.rstrip('/').split('/') if component]
catalog: str = None
endpoint: str = self.options.get(CatalogOptions.URI)
if len(components) > 0:
if '.' in components[0]:
(catalog, endpoint) = components[0].split('.', 1)
else:
catalog = components[0]
if len(components) == 0:
return None
elif len(components) == 1:
return (PVFSCatalogIdentifier(endpoint=endpoint, catalog=catalog))
elif len(components) == 2:
return PVFSDatabaseIdentifier(endpoint=endpoint, catalog=catalog, database=components[1])
elif len(components) == 3:
return PVFSTableIdentifier(endpoint=endpoint, catalog=catalog, database=components[1], table=components[2])
elif len(components) > 3:
sub_path = '/'.join(components[3:])
return PVFSTableIdentifier(
endpoint=endpoint,
catalog=catalog, database=components[1],
table=components[2], sub_path=sub_path
)
return None
def _get_table_store(self, rest_api: RESTApi, pvfs_identifier: PVFSTableIdentifier) -> Optional['TableStore']:
if not self._cache_enable:
table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table))
return self._create_table_store(table)
read_lock = self._table_cache_lock.gen_rlock()
try:
read_lock.acquire()
cache_value: Tuple[str, TableStore] = self._table_cache.get(
pvfs_identifier.name()
)
if cache_value is not None:
return cache_value
finally:
read_lock.release()
write_lock = self._table_cache_lock.gen_wlock()
try:
write_lock.acquire()
table = rest_api.get_table(Identifier.create(pvfs_identifier.database, pvfs_identifier.table))
if table is not None:
table_store = self._create_table_store(table)
self._table_cache[pvfs_identifier.name()] = table_store
return table_store
else:
return None
finally:
write_lock.release()
def _create_table_store(self, table: GetTableResponse):
created = self.__converse_ts_to_datatime(table.created_at)
modified = self.__converse_ts_to_datatime(table.updated_at)
return TableStore(path=table.path, created=created, modified=modified)
def __rest_api(self, pvfs_identifier: PVFSIdentifier):
read_lock = self._rest_api_cache_lock.gen_rlock()
catalog = pvfs_identifier.catalog
if pvfs_identifier.endpoint is None or catalog is None:
raise ValueError("Endpoint or catalog is not set.")
key = pvfs_identifier.get_cache_key()
try:
read_lock.acquire()
rest_api = self._rest_api_cache.get(key)
if rest_api is not None:
return rest_api
finally:
read_lock.release()
write_lock = self._rest_api_cache_lock.gen_wlock()
try:
write_lock.acquire()
rest_api = self._rest_api_cache.get(key)
if rest_api is None:
options = self.options.copy()
options.update({CatalogOptions.WAREHOUSE: catalog, CatalogOptions.URI: pvfs_identifier.endpoint})
rest_api = RESTApi(options)
self._rest_api_cache[catalog] = rest_api
return rest_api
finally:
write_lock.release()
def _get_filesystem(self, pvfs_table_identifier: PVFSTableIdentifier, storage_type: StorageType) -> 'FileSystem':
read_lock = self._fs_cache_lock.gen_rlock()
try:
read_lock.acquire()
cache_value: Tuple[StorageType, AbstractFileSystem] = self._fs_cache.get(
storage_type
)
if cache_value is not None:
return cache_value
finally:
read_lock.release()
write_lock = self._table_cache_lock.gen_wlock()
try:
write_lock.acquire()
cache_value: PaimonRealStorage = self._fs_cache.get(pvfs_table_identifier)
if cache_value is not None and cache_value.need_refresh() is False:
return cache_value.file_system
if storage_type == StorageType.LOCAL:
fs = LocalFileSystem()
elif storage_type == StorageType.OSS:
rest_api = self.__rest_api(pvfs_table_identifier)
load_token_response: GetTableTokenResponse = rest_api.load_table_token(
Identifier.create(pvfs_table_identifier.database, pvfs_table_identifier.table))
fs = self._get_oss_filesystem(load_token_response.token)
paimon_real_storage = PaimonRealStorage(
token=load_token_response.token,
expires_at_millis=load_token_response.expires_at_millis,
file_system=fs
)
self._fs_cache[pvfs_table_identifier] = paimon_real_storage
else:
raise Exception(
"Storage type: `{}` doesn't support now.".format(storage_type)
)
return fs
finally:
write_lock.release()
@staticmethod
def _get_storage_type(path: str):
if path.startswith("{}:/".format(StorageType.LOCAL.value)):
return StorageType.LOCAL
elif path.startswith("{}://".format(StorageType.OSS.value)):
return StorageType.OSS
raise Exception(
"Storage type doesn't support now. Path:{}".format(path)
)
@staticmethod
def _get_oss_filesystem(options: Dict[str, str]) -> AbstractFileSystem:
access_key_id = options.get(OssOptions.OSS_ACCESS_KEY_ID)
if access_key_id is None:
raise Exception(
"OSS access key id is not found in the options."
)
access_key_secret = options.get(
OssOptions.OSS_ACCESS_KEY_SECRET
)
if access_key_secret is None:
raise Exception(
"OSS access key secret is not found in the options."
)
oss_endpoint_url = options.get(OssOptions.OSS_ENDPOINT)
if oss_endpoint_url is None:
raise Exception(
"OSS endpoint url is not found in the options."
)
token = options.get(OssOptions.OSS_SECURITY_TOKEN)
return importlib.import_module("ossfs").OSSFileSystem(
key=access_key_id,
secret=access_key_secret,
token=token,
endpoint=oss_endpoint_url
)