blob: 771c6b5a0f911143d4a6b670bb3f31490836db7d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib
import logging
import uuid
from abc import ABC, abstractmethod
import mmh3
from pyiceberg.partitioning import PartitionKey
from pyiceberg.typedef import Properties
from pyiceberg.utils.properties import property_as_bool
logger = logging.getLogger(__name__)
class LocationProvider(ABC):
"""A base class for location providers, that provide file locations for a table's write tasks.
Args:
table_location (str): The table's base storage location.
table_properties (Properties): The table's properties.
"""
table_location: str
table_properties: Properties
data_path: str
metadata_path: str
def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
self.table_properties = table_properties
from pyiceberg.table import TableProperties
if path := table_properties.get(TableProperties.WRITE_DATA_PATH):
self.data_path = path.rstrip("/")
else:
self.data_path = f"{self.table_location.rstrip('/')}/data"
if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
self.metadata_path = path.rstrip("/")
else:
self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"
@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
"""Return a fully-qualified data file location for the given filename.
Args:
data_file_name (str): The name of the data file.
partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned.
Returns:
str: A fully-qualified location URI for the data file.
"""
def new_table_metadata_file_location(self, new_version: int = 0) -> str:
"""Return a fully-qualified metadata file location for a new table version.
Args:
new_version (int): Version number of the metadata file.
Returns:
str: fully-qualified URI for the new table metadata file.
Raises:
ValueError: If the version is negative.
"""
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
return self.new_metadata_location(file_name)
def new_metadata_location(self, metadata_file_name: str) -> str:
"""Return a fully-qualified metadata file location for the given filename.
Args:
metadata_file_name (str): Name of the metadata file.
Returns:
str: A fully-qualified location URI for the metadata file.
"""
return f"{self.metadata_path}/{metadata_file_name}"
class SimpleLocationProvider(LocationProvider):
def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)
def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
return (
f"{self.data_path}/{partition_key.to_path()}/{data_file_name}"
if partition_key
else f"{self.data_path}/{data_file_name}"
)
class ObjectStoreLocationProvider(LocationProvider):
HASH_BINARY_STRING_BITS = 20
ENTROPY_DIR_LENGTH = 4
ENTROPY_DIR_DEPTH = 3
_include_partition_paths: bool
def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)
from pyiceberg.table import TableProperties
self._include_partition_paths = property_as_bool(
self.table_properties,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT,
)
def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
if self._include_partition_paths and partition_key:
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")
hashed_path = self._compute_hash(data_file_name)
return (
f"{self.data_path}/{hashed_path}/{data_file_name}"
if self._include_partition_paths
else f"{self.data_path}/{hashed_path}-{data_file_name}"
)
@staticmethod
def _compute_hash(data_file_name: str) -> str:
# Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip.
top_mask = 1 << ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS
hash_code = mmh3.hash(data_file_name) & (top_mask - 1) | top_mask
return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS :])
@staticmethod
def _dirs_from_hash(file_hash: str) -> str:
"""Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
total_entropy_length = ObjectStoreLocationProvider.ENTROPY_DIR_DEPTH * ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH
hash_with_dirs = []
for i in range(0, total_entropy_length, ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH):
hash_with_dirs.append(file_hash[i : i + ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH])
if len(file_hash) > total_entropy_length:
hash_with_dirs.append(file_hash[total_entropy_length:])
return "/".join(hash_with_dirs)
def _import_location_provider(
location_provider_impl: str, table_location: str, table_properties: Properties
) -> LocationProvider | None:
try:
path_parts = location_provider_impl.split(".")
if len(path_parts) < 2:
from pyiceberg.table import TableProperties
raise ValueError(
f"{TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL} should be full path "
f"(module.CustomLocationProvider), got: {location_provider_impl}"
)
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
module = importlib.import_module(module_name)
class_ = getattr(module, class_name)
return class_(table_location, table_properties)
except ModuleNotFoundError:
logger.warning(
f"Could not initialize LocationProvider: {location_provider_impl}",
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return None
def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider:
from pyiceberg.table import TableProperties
table_location = table_location.rstrip("/")
if location_provider_impl := table_properties.get(TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL):
if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties):
logger.info("Loaded LocationProvider: %s", location_provider_impl)
return location_provider
else:
raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}")
if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT):
return ObjectStoreLocationProvider(table_location, table_properties)
else:
return SimpleLocationProvider(table_location, table_properties)