blob: 2b9c226525fedc45e70c0d55c9368cd04306e341 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import getpass
import logging
import socket
import time
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Set,
Type,
Union,
)
from urllib.parse import urlparse
from hive_metastore.ThriftHiveMetastore import Client
from hive_metastore.ttypes import (
AlreadyExistsException,
CheckLockRequest,
FieldSchema,
InvalidOperationException,
LockComponent,
LockLevel,
LockRequest,
LockResponse,
LockState,
LockType,
MetaException,
NoSuchObjectException,
SerDeInfo,
StorageDescriptor,
UnlockRequest,
)
from hive_metastore.ttypes import Database as HiveDatabase
from hive_metastore.ttypes import Table as HiveTable
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from pyiceberg.catalog import (
EXTERNAL_TABLE,
ICEBERG,
LOCATION,
METADATA_LOCATION,
TABLE_TYPE,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
TableAlreadyExistsError,
WaitingForLockException,
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
from pyiceberg.utils.properties import property_as_bool, property_as_float
if TYPE_CHECKING:
import pyarrow as pa
COMMENT = "comment"
OWNER = "owner"
# If set to true, HiveCatalog will operate in Hive2 compatibility mode
HIVE2_COMPATIBLE = "hive.hive2-compatible"
HIVE2_COMPATIBLE_DEFAULT = False
LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
LOCK_CHECK_RETRIES = "lock-check-retries"
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
DEFAULT_LOCK_CHECK_RETRIES = 4
logger = logging.getLogger(__name__)
class _HiveClient:
"""Helper class to nicely open and close the transport."""
_transport: TTransport
_client: Client
_ugi: Optional[List[str]]
def __init__(self, uri: str, ugi: Optional[str] = None):
url_parts = urlparse(uri)
transport = TSocket.TSocket(url_parts.hostname, url_parts.port)
self._transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
self._client = Client(protocol)
self._ugi = ugi.split(":") if ugi else None
def __enter__(self) -> Client:
self._transport.open()
if self._ugi:
self._client.set_ugi(*self._ugi)
return self._client
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
self._transport.close()
def _construct_hive_storage_descriptor(
schema: Schema, location: Optional[str], hive2_compatible: bool = False
) -> StorageDescriptor:
ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
return StorageDescriptor(
[
FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter(hive2_compatible)), field.doc)
for field in schema.fields
],
location,
"org.apache.hadoop.mapred.FileInputFormat",
"org.apache.hadoop.mapred.FileOutputFormat",
serdeInfo=ser_de_info,
)
PROP_EXTERNAL = "EXTERNAL"
PROP_TABLE_TYPE = "table_type"
PROP_METADATA_LOCATION = "metadata_location"
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT}
def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]:
properties = {PROP_EXTERNAL: "TRUE", PROP_TABLE_TYPE: "ICEBERG", PROP_METADATA_LOCATION: metadata_location}
if previous_metadata_location:
properties[PROP_PREVIOUS_METADATA_LOCATION] = previous_metadata_location
return properties
def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveDatabase:
params = {}
for key, value in properties.items():
if key == COMMENT:
database.description = value
elif key == LOCATION:
database.locationUri = value
else:
params[key] = value
database.parameters = params
return database
HIVE_PRIMITIVE_TYPES = {
BooleanType: "boolean",
IntegerType: "int",
LongType: "bigint",
FloatType: "float",
DoubleType: "double",
DateType: "date",
TimeType: "string",
TimestampType: "timestamp",
TimestamptzType: "timestamp with local time zone",
StringType: "string",
UUIDType: "string",
BinaryType: "binary",
FixedType: "binary",
}
class SchemaToHiveConverter(SchemaVisitor[str]):
hive2_compatible: bool
def __init__(self, hive2_compatible: bool):
self.hive2_compatible = hive2_compatible
def schema(self, schema: Schema, struct_result: str) -> str:
return struct_result
def struct(self, struct: StructType, field_results: List[str]) -> str:
return f"struct<{','.join(field_results)}>"
def field(self, field: NestedField, field_result: str) -> str:
return f"{field.name}:{field_result}"
def list(self, list_type: ListType, element_result: str) -> str:
return f"array<{element_result}>"
def map(self, map_type: MapType, key_result: str, value_result: str) -> str:
# Key has to be primitive for Hive
return f"map<{key_result},{value_result}>"
def primitive(self, primitive: PrimitiveType) -> str:
if isinstance(primitive, DecimalType):
return f"decimal({primitive.precision},{primitive.scale})"
elif self.hive2_compatible and isinstance(primitive, TimestamptzType):
# Hive2 doesn't support timestamp with local time zone
return "timestamp"
else:
return HIVE_PRIMITIVE_TYPES[type(primitive)]
class HiveCatalog(MetastoreCatalog):
_client: _HiveClient
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
self._lock_check_retries = property_as_float(
properties,
LOCK_CHECK_RETRIES,
DEFAULT_LOCK_CHECK_RETRIES,
)
def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
properties: Dict[str, str] = table.parameters
if TABLE_TYPE not in properties:
raise NoSuchPropertyException(
f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}"
)
table_type = properties[TABLE_TYPE]
if table_type.lower() != ICEBERG:
raise NoSuchIcebergTableError(
f"Property table_type is {table_type}, expected {ICEBERG}: {table.dbName}.{table.tableName}"
)
if prop_metadata_location := properties.get(METADATA_LOCATION):
metadata_location = prop_metadata_location
else:
raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing")
io = self._load_file_io(location=metadata_location)
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
identifier=(self.name, table.dbName, table.tableName),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
catalog=self,
)
def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
identifier_tuple = self.identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
current_time_millis = int(time.time() * 1000)
return HiveTable(
dbName=database_name,
tableName=table_name,
owner=table.properties[OWNER] if table.properties and OWNER in table.properties else getpass.getuser(),
createTime=current_time_millis // 1000,
lastAccessTime=current_time_millis // 1000,
sd=_construct_hive_storage_descriptor(
table.schema(),
table.location(),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
),
tableType=EXTERNAL_TABLE,
parameters=_construct_parameters(table.metadata_location),
)
def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None:
try:
open_client.create_table(hive_table)
except AlreadyExistsException as e:
raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e
def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable:
try:
return open_client.get_table(dbname=database_name, tbl_name=table_name)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
def create_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
"""Create a table.
Args:
identifier: Table identifier.
schema: Table's schema.
location: Location for the table. Optional Argument.
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
Returns:
Table: the created table instance.
Raises:
AlreadyExistsError: If a table with the name already exists.
ValueError: If the identifier is invalid.
"""
properties = {**DEFAULT_PROPERTIES, **properties}
staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)
database_name, table_name = self.identifier_to_database_and_table(identifier)
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
tbl = self._convert_iceberg_into_hive(staged_table)
with self._client as open_client:
self._create_hive_table(open_client, tbl)
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
return self._convert_hive_into_iceberg(hive_table)
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Args:
identifier Union[str, Identifier]: Table identifier for the table
metadata_location str: The location to the metadata
Returns:
Table: The newly registered table
Raises:
TableAlreadyExistsError: If the table already exists
"""
raise NotImplementedError
def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest:
lock_component: LockComponent = LockComponent(
level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True
)
lock_request: LockRequest = LockRequest(component=[lock_component], user=getpass.getuser(), hostname=socket.gethostname())
return lock_request
def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
@retry(
retry=retry_if_exception_type(WaitingForLockException),
wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
stop=stop_after_attempt(self._lock_check_retries),
reraise=True,
)
def _do_wait_for_lock() -> LockResponse:
response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
if response.state == LockState.ACQUIRED:
return response
elif response.state == LockState.WAITING:
msg = f"Wait on lock for {database_name}.{table_name}"
logger.warning(msg)
raise WaitingForLockException(msg)
else:
raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")
return _do_wait_for_lock()
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Args:
table_request (CommitTableRequest): The table requests to be carried out.
Returns:
CommitTableResponse: The updated metadata.
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
try:
if lock.state != LockState.ACQUIRED:
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
hive_table: Optional[HiveTable]
current_table: Optional[Table]
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
current_table = self._convert_hive_into_iceberg(hive_table)
except NoSuchTableError:
hive_table = None
current_table = None
updated_staged_table = self._update_and_stage_table(current_table, table_request)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)
if hive_table and current_table:
# Table exists, update it.
hive_table.parameters = _construct_parameters(
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
)
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
else:
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(self.name, database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))
return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and return the table instance.
You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
Note: This method doesn't scan data stored in the table.
Args:
identifier: Table identifier.
Returns:
Table: the table instance with its metadata.
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with self._client as open_client:
hive_table = self._get_hive_table(open_client, database_name, table_name)
return self._convert_hive_into_iceberg(hive_table)
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
Args:
identifier: Table identifier.
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
open_client.drop_table(dbname=database_name, name=table_name, deleteData=False)
except NoSuchObjectException as e:
# When the namespace doesn't exist, it throws the same error
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
def purge_table(self, identifier: Union[str, Identifier]) -> None:
# This requires to traverse the reachability set, and drop all the data files.
raise NotImplementedError("Not yet implemented")
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
"""Rename a fully classified table name.
Args:
from_identifier: Existing table identifier.
to_identifier: New table identifier.
Returns:
Table: the updated table instance with its metadata.
Raises:
ValueError: When from table identifier is invalid.
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
tbl = open_client.get_table(dbname=from_database_name, tbl_name=from_table_name)
tbl.dbName = to_database_name
tbl.tableName = to_table_name
open_client.alter_table(dbname=from_database_name, tbl_name=from_table_name, new_tbl=tbl)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e
except InvalidOperationException as e:
raise NoSuchNamespaceError(f"Database does not exists: {to_database_name}") from e
return self.load_table(to_identifier)
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
Args:
namespace: Namespace identifier.
properties: A string dictionary of properties for the given namespace.
Raises:
ValueError: If the identifier is invalid.
AlreadyExistsError: If a namespace with the given name already exists.
"""
database_name = self.identifier_to_database(namespace)
hive_database = HiveDatabase(name=database_name, parameters=properties)
try:
with self._client as open_client:
open_client.create_database(_annotate_namespace(hive_database, properties))
except AlreadyExistsException as e:
raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
"""Drop a namespace.
Args:
namespace: Namespace identifier.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
NamespaceNotEmptyError: If the namespace is not empty.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
try:
with self._client as open_client:
open_client.drop_database(database_name, deleteData=False, cascade=False)
except InvalidOperationException as e:
raise NamespaceNotEmptyError(f"Database {database_name} is not empty") from e
except MetaException as e:
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""List tables under the given namespace in the catalog (including non-Iceberg tables).
When the database doesn't exist, it will just return an empty list.
Args:
namespace: Database to list.
Returns:
List[Identifier]: list of table identifiers.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
with self._client as open_client:
return [(database_name, table_name) for table_name in open_client.get_all_tables(db_name=database_name)]
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
Returns:
List[Identifier]: a List of namespace identifiers.
"""
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []
with self._client as open_client:
return list(map(self.identifier_to_tuple, open_client.get_all_databases()))
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
"""Get properties for a namespace.
Args:
namespace: Namespace identifier.
Returns:
Properties: Properties for the given namespace.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
try:
with self._client as open_client:
database = open_client.get_database(name=database_name)
properties = database.parameters
properties[LOCATION] = database.locationUri
if comment := database.description:
properties[COMMENT] = comment
return properties
except NoSuchObjectException as e:
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
"""Remove provided property keys and update properties for a namespace.
Args:
namespace: Namespace identifier.
removals: Set of property keys that need to be removed. Optional Argument.
updates: Properties to be updated for the given namespace.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist
ValueError: If removals and updates have overlapping keys.
"""
self._check_for_overlap(updates=updates, removals=removals)
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
with self._client as open_client:
try:
database = open_client.get_database(database_name)
parameters = database.parameters
except NoSuchObjectException as e:
raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
removed: Set[str] = set()
updated: Set[str] = set()
if removals:
for key in removals:
if key in parameters:
parameters[key] = None
removed.add(key)
if updates:
for key, value in updates.items():
parameters[key] = value
updated.add(key)
open_client.alter_database(database_name, _annotate_namespace(database, parameters))
expected_to_change = (removals or set()).difference(removed)
return PropertiesUpdateSummary(removed=list(removed or []), updated=list(updated or []), missing=list(expected_to_change))