| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| from typing import ( |
| TYPE_CHECKING, |
| Any, |
| Dict, |
| List, |
| Optional, |
| Set, |
| Tuple, |
| Union, |
| cast, |
| ) |
| |
| import boto3 |
| from botocore.config import Config |
| from mypy_boto3_glue.client import GlueClient |
| from mypy_boto3_glue.type_defs import ( |
| ColumnTypeDef, |
| DatabaseInputTypeDef, |
| DatabaseTypeDef, |
| StorageDescriptorTypeDef, |
| TableInputTypeDef, |
| TableTypeDef, |
| ) |
| |
| from pyiceberg.catalog import ( |
| BOTOCORE_SESSION, |
| EXTERNAL_TABLE, |
| ICEBERG, |
| LOCATION, |
| METADATA_LOCATION, |
| PREVIOUS_METADATA_LOCATION, |
| TABLE_TYPE, |
| MetastoreCatalog, |
| PropertiesUpdateSummary, |
| ) |
| from pyiceberg.exceptions import ( |
| CommitFailedException, |
| NamespaceAlreadyExistsError, |
| NamespaceNotEmptyError, |
| NoSuchIcebergTableError, |
| NoSuchNamespaceError, |
| NoSuchPropertyException, |
| NoSuchTableError, |
| TableAlreadyExistsError, |
| ) |
| from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN |
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec |
| from pyiceberg.schema import Schema, SchemaVisitor, visit |
| from pyiceberg.serializers import FromInputFile |
| from pyiceberg.table import ( |
| CommitTableResponse, |
| Table, |
| ) |
| from pyiceberg.table.metadata import TableMetadata |
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder |
| from pyiceberg.table.update import ( |
| TableRequirement, |
| TableUpdate, |
| ) |
| from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DecimalType, |
| DoubleType, |
| FixedType, |
| FloatType, |
| IntegerType, |
| ListType, |
| LongType, |
| MapType, |
| NestedField, |
| PrimitiveType, |
| StringType, |
| StructType, |
| TimestampType, |
| TimestamptzType, |
| TimeType, |
| UUIDType, |
| ) |
| from pyiceberg.utils.properties import get_first_property_value, property_as_bool |
| |
| if TYPE_CHECKING: |
| import pyarrow as pa |
| |
| |
| # There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue |
| # metastore to use based on the user's default AWS client credential and region setup. You can specify the Glue catalog |
| # ID through glue.id catalog property to point to a Glue catalog in a different AWS account. The Glue catalog ID is your |
| # numeric AWS account ID. |
| GLUE_ID = "glue.id" |
| |
| # If Glue should skip archiving an old table version when creating a new version in a commit. By |
| # default, Glue archives all old table versions after an UpdateTable call, but Glue has a default |
| # max number of archived table versions (can be increased). So for streaming use case with lots |
| # of commits, it is recommended to set this value to true. |
| GLUE_SKIP_ARCHIVE = "glue.skip-archive" |
| GLUE_SKIP_ARCHIVE_DEFAULT = True |
| |
| # Configure an alternative endpoint of the Glue service for GlueCatalog to access. |
| # This could be used to use GlueCatalog with any glue-compatible metastore service that has a different endpoint |
| GLUE_CATALOG_ENDPOINT = "glue.endpoint" |
| |
| ICEBERG_FIELD_ID = "iceberg.field.id" |
| ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional" |
| ICEBERG_FIELD_CURRENT = "iceberg.field.current" |
| |
| GLUE_PROFILE_NAME = "glue.profile-name" |
| GLUE_REGION = "glue.region" |
| GLUE_ACCESS_KEY_ID = "glue.access-key-id" |
| GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key" |
| GLUE_SESSION_TOKEN = "glue.session-token" |
| GLUE_MAX_RETRIES = "glue.max-retries" |
| GLUE_RETRY_MODE = "glue.retry-mode" |
| |
| MAX_RETRIES = 10 |
| STANDARD_RETRY_MODE = "standard" |
| ADAPTIVE_RETRY_MODE = "adaptive" |
| LEGACY_RETRY_MODE = "legacy" |
| EXISTING_RETRY_MODES = [STANDARD_RETRY_MODE, ADAPTIVE_RETRY_MODE, LEGACY_RETRY_MODE] |
| |
| |
| def _construct_parameters( |
| metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None |
| ) -> Properties: |
| new_parameters = glue_table.get("Parameters", {}) if glue_table else {} |
| new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}) |
| if prev_metadata_location: |
| new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location |
| return new_parameters |
| |
| |
| GLUE_PRIMITIVE_TYPES = { |
| BooleanType: "boolean", |
| IntegerType: "int", |
| LongType: "bigint", |
| FloatType: "float", |
| DoubleType: "double", |
| DateType: "date", |
| TimeType: "string", |
| StringType: "string", |
| UUIDType: "string", |
| TimestampType: "timestamp", |
| TimestamptzType: "timestamp", |
| FixedType: "binary", |
| BinaryType: "binary", |
| } |
| |
| |
| class _IcebergSchemaToGlueType(SchemaVisitor[str]): |
| def schema(self, schema: Schema, struct_result: str) -> str: |
| return struct_result |
| |
| def struct(self, struct: StructType, field_results: List[str]) -> str: |
| return f"struct<{','.join(field_results)}>" |
| |
| def field(self, field: NestedField, field_result: str) -> str: |
| return f"{field.name}:{field_result}" |
| |
| def list(self, list_type: ListType, element_result: str) -> str: |
| return f"array<{element_result}>" |
| |
| def map(self, map_type: MapType, key_result: str, value_result: str) -> str: |
| return f"map<{key_result},{value_result}>" |
| |
| def primitive(self, primitive: PrimitiveType) -> str: |
| if isinstance(primitive, DecimalType): |
| return f"decimal({primitive.precision},{primitive.scale})" |
| if (primitive_type := type(primitive)) not in GLUE_PRIMITIVE_TYPES: |
| return str(primitive) |
| return GLUE_PRIMITIVE_TYPES[primitive_type] |
| |
| |
| def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]: |
| results: Dict[str, ColumnTypeDef] = {} |
| |
| def _append_to_results(field: NestedField, is_current: bool) -> None: |
| if field.name in results: |
| return |
| |
| results[field.name] = cast( |
| ColumnTypeDef, |
| { |
| "Name": field.name, |
| "Type": visit(field.field_type, _IcebergSchemaToGlueType()), |
| "Parameters": { |
| ICEBERG_FIELD_ID: str(field.field_id), |
| ICEBERG_FIELD_OPTIONAL: str(field.optional).lower(), |
| ICEBERG_FIELD_CURRENT: str(is_current).lower(), |
| }, |
| }, |
| ) |
| if field.doc: |
| results[field.name]["Comment"] = field.doc |
| |
| if current_schema := metadata.schema_by_id(metadata.current_schema_id): |
| for field in current_schema.columns: |
| _append_to_results(field, True) |
| |
| for schema in metadata.schemas: |
| if schema.schema_id == metadata.current_schema_id: |
| continue |
| for field in schema.columns: |
| _append_to_results(field, False) |
| |
| return list(results.values()) |
| |
| |
| def _construct_table_input( |
| table_name: str, |
| metadata_location: str, |
| properties: Properties, |
| metadata: TableMetadata, |
| glue_table: Optional[TableTypeDef] = None, |
| prev_metadata_location: Optional[str] = None, |
| ) -> TableInputTypeDef: |
| table_input: TableInputTypeDef = { |
| "Name": table_name, |
| "TableType": EXTERNAL_TABLE, |
| "Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location), |
| "StorageDescriptor": { |
| "Columns": _to_columns(metadata), |
| "Location": metadata.location, |
| }, |
| } |
| |
| if "Description" in properties: |
| table_input["Description"] = properties["Description"] |
| |
| return table_input |
| |
| |
| def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef) -> TableInputTypeDef: |
| rename_table_input: TableInputTypeDef = {"Name": to_table_name} |
| # use the same Glue info to create the new table, pointing to the old metadata |
| assert glue_table["TableType"] |
| rename_table_input["TableType"] = glue_table["TableType"] |
| if "Owner" in glue_table: |
| rename_table_input["Owner"] = glue_table["Owner"] |
| |
| if "Parameters" in glue_table: |
| rename_table_input["Parameters"] = glue_table["Parameters"] |
| |
| if "StorageDescriptor" in glue_table: |
| # It turns out the output of StorageDescriptor is not the same as the input type |
| # because the Column can have a different type, but for now it seems to work, so |
| # silence the type error. |
| rename_table_input["StorageDescriptor"] = cast(StorageDescriptorTypeDef, glue_table["StorageDescriptor"]) |
| |
| if "Description" in glue_table: |
| rename_table_input["Description"] = glue_table["Description"] |
| |
| return rename_table_input |
| |
| |
| def _construct_database_input(database_name: str, properties: Properties) -> DatabaseInputTypeDef: |
| database_input: DatabaseInputTypeDef = {"Name": database_name} |
| parameters = {} |
| for k, v in properties.items(): |
| if k == "Description": |
| database_input["Description"] = v |
| elif k == LOCATION: |
| database_input["LocationUri"] = v |
| else: |
| parameters[k] = v |
| database_input["Parameters"] = parameters |
| return database_input |
| |
| |
| def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None: |
| """ |
| Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods. |
| |
| It's more ergonomic to do this than to pass the CatalogId as a parameter to every client call since it's an optional |
| parameter and boto3 does not support 'None' values for missing parameters. |
| """ |
| event_system = glue.meta.events |
| |
| def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None: |
| if "CatalogId" not in params: |
| params["CatalogId"] = glue_catalog_id |
| |
| event_system.register("provide-client-params.glue", add_glue_catalog_id) |
| |
| |
| class GlueCatalog(MetastoreCatalog): |
| def __init__(self, name: str, **properties: Any): |
| super().__init__(name, **properties) |
| |
| retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE) |
| |
| session = boto3.Session( |
| profile_name=properties.get(GLUE_PROFILE_NAME), |
| region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION), |
| botocore_session=properties.get(BOTOCORE_SESSION), |
| aws_access_key_id=get_first_property_value(properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), |
| aws_secret_access_key=get_first_property_value(properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), |
| aws_session_token=get_first_property_value(properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN), |
| ) |
| self.glue: GlueClient = session.client( |
| "glue", |
| endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT), |
| config=Config( |
| retries={ |
| "max_attempts": properties.get(GLUE_MAX_RETRIES, MAX_RETRIES), |
| "mode": retry_mode_prop_value if retry_mode_prop_value in EXISTING_RETRY_MODES else STANDARD_RETRY_MODE, |
| } |
| ), |
| ) |
| |
| if glue_catalog_id := properties.get(GLUE_ID): |
| _register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id) |
| |
| def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table: |
| properties: Properties = glue_table["Parameters"] |
| |
| assert glue_table["DatabaseName"] |
| assert glue_table["Parameters"] |
| database_name = glue_table["DatabaseName"] |
| table_name = glue_table["Name"] |
| |
| if TABLE_TYPE not in properties: |
| raise NoSuchPropertyException( |
| f"Property {TABLE_TYPE} missing, could not determine type: {database_name}.{table_name}" |
| ) |
| glue_table_type = properties[TABLE_TYPE] |
| |
| if glue_table_type.lower() != ICEBERG: |
| raise NoSuchIcebergTableError( |
| f"Property table_type is {glue_table_type}, expected {ICEBERG}: {database_name}.{table_name}" |
| ) |
| |
| if METADATA_LOCATION not in properties: |
| raise NoSuchPropertyException( |
| f"Table property {METADATA_LOCATION} is missing, cannot find metadata for: {database_name}.{table_name}" |
| ) |
| metadata_location = properties[METADATA_LOCATION] |
| |
| io = self._load_file_io(location=metadata_location) |
| file = io.new_input(metadata_location) |
| metadata = FromInputFile.table_metadata(file) |
| return Table( |
| identifier=(database_name, table_name), |
| metadata=metadata, |
| metadata_location=metadata_location, |
| io=self._load_file_io(metadata.properties, metadata_location), |
| catalog=self, |
| ) |
| |
| def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None: |
| try: |
| self.glue.create_table(DatabaseName=database_name, TableInput=table_input) |
| except self.glue.exceptions.AlreadyExistsException as e: |
| raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e |
| |
| def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None: |
| try: |
| self.glue.update_table( |
| DatabaseName=database_name, |
| TableInput=table_input, |
| SkipArchive=property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), |
| VersionId=version_id, |
| ) |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name} (Glue table version {version_id})") from e |
| except self.glue.exceptions.ConcurrentModificationException as e: |
| raise CommitFailedException( |
| f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}" |
| ) from e |
| |
| def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef: |
| try: |
| load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) |
| return load_table_response["Table"] |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e |
| |
| def create_table( |
| self, |
| identifier: Union[str, Identifier], |
| schema: Union[Schema, "pa.Schema"], |
| location: Optional[str] = None, |
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, |
| sort_order: SortOrder = UNSORTED_SORT_ORDER, |
| properties: Properties = EMPTY_DICT, |
| ) -> Table: |
| """ |
| Create an Iceberg table. |
| |
| Args: |
| identifier: Table identifier. |
| schema: Table's schema. |
| location: Location for the table. Optional Argument. |
| partition_spec: PartitionSpec for the table. |
| sort_order: SortOrder for the table. |
| properties: Table properties that can be a string based dictionary. |
| |
| Returns: |
| Table: the created table instance. |
| |
| Raises: |
| AlreadyExistsError: If a table with the name already exists. |
| ValueError: If the identifier is invalid, or no path is given to store metadata. |
| |
| """ |
| staged_table = self._create_staged_table( |
| identifier=identifier, |
| schema=schema, |
| location=location, |
| partition_spec=partition_spec, |
| sort_order=sort_order, |
| properties=properties, |
| ) |
| database_name, table_name = self.identifier_to_database_and_table(identifier) |
| |
| self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) |
| table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) |
| self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) |
| |
| return self.load_table(identifier=identifier) |
| |
| def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: |
| """Register a new table using existing metadata. |
| |
| Args: |
| identifier Union[str, Identifier]: Table identifier for the table |
| metadata_location str: The location to the metadata |
| |
| Returns: |
| Table: The newly registered table |
| |
| Raises: |
| TableAlreadyExistsError: If the table already exists |
| """ |
| database_name, table_name = self.identifier_to_database_and_table(identifier) |
| properties = EMPTY_DICT |
| io = self._load_file_io(location=metadata_location) |
| file = io.new_input(metadata_location) |
| metadata = FromInputFile.table_metadata(file) |
| table_input = _construct_table_input(table_name, metadata_location, properties, metadata) |
| self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) |
| return self.load_table(identifier=identifier) |
| |
| def commit_table( |
| self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] |
| ) -> CommitTableResponse: |
| """Commit updates to a table. |
| |
| Args: |
| table (Table): The table to be updated. |
| requirements: (Tuple[TableRequirement, ...]): Table requirements. |
| updates: (Tuple[TableUpdate, ...]): Table updates. |
| |
| Returns: |
| CommitTableResponse: The updated metadata. |
| |
| Raises: |
| NoSuchTableError: If a table with the given identifier does not exist. |
| CommitFailedException: Requirement not met, or a conflict with a concurrent commit. |
| """ |
| table_identifier = table.name() |
| database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) |
| |
| current_glue_table: Optional[TableTypeDef] |
| glue_table_version_id: Optional[str] |
| current_table: Optional[Table] |
| try: |
| current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) |
| glue_table_version_id = current_glue_table.get("VersionId") |
| current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) |
| except NoSuchTableError: |
| current_glue_table = None |
| glue_table_version_id = None |
| current_table = None |
| |
| updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) |
| if current_table and updated_staged_table.metadata == current_table.metadata: |
| # no changes, do nothing |
| return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) |
| self._write_metadata( |
| metadata=updated_staged_table.metadata, |
| io=updated_staged_table.io, |
| metadata_path=updated_staged_table.metadata_location, |
| ) |
| |
| if current_table: |
| # table exists, update the table |
| if not glue_table_version_id: |
| raise CommitFailedException( |
| f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" |
| ) |
| |
| # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent |
| # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking |
| update_table_input = _construct_table_input( |
| table_name=table_name, |
| metadata_location=updated_staged_table.metadata_location, |
| properties=updated_staged_table.properties, |
| metadata=updated_staged_table.metadata, |
| glue_table=current_glue_table, |
| prev_metadata_location=current_table.metadata_location, |
| ) |
| self._update_glue_table( |
| database_name=database_name, |
| table_name=table_name, |
| table_input=update_table_input, |
| version_id=glue_table_version_id, |
| ) |
| else: |
| # table does not exist, create the table |
| create_table_input = _construct_table_input( |
| table_name=table_name, |
| metadata_location=updated_staged_table.metadata_location, |
| properties=updated_staged_table.properties, |
| metadata=updated_staged_table.metadata, |
| ) |
| self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) |
| |
| return CommitTableResponse( |
| metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location |
| ) |
| |
| def load_table(self, identifier: Union[str, Identifier]) -> Table: |
| """Load the table's metadata and returns the table instance. |
| |
| You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'. |
| Note: This method doesn't scan data stored in the table. |
| |
| Args: |
| identifier: Table identifier. |
| |
| Returns: |
| Table: the table instance with its metadata. |
| |
| Raises: |
| NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. |
| """ |
| database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) |
| |
| return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name)) |
| |
| def drop_table(self, identifier: Union[str, Identifier]) -> None: |
| """Drop a table. |
| |
| Args: |
| identifier: Table identifier. |
| |
| Raises: |
| NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. |
| """ |
| database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) |
| try: |
| self.glue.delete_table(DatabaseName=database_name, Name=table_name) |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e |
| |
| def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: |
| """Rename a fully classified table name. |
| |
| This method can only rename Iceberg tables in AWS Glue. |
| |
| Args: |
| from_identifier: Existing table identifier. |
| to_identifier: New table identifier. |
| |
| Returns: |
| Table: the updated table instance with its metadata. |
| |
| Raises: |
| ValueError: When from table identifier is invalid. |
| NoSuchTableError: When a table with the name does not exist. |
| NoSuchIcebergTableError: When from table is not a valid iceberg table. |
| NoSuchPropertyException: When from table miss some required properties. |
| NoSuchNamespaceError: When the destination namespace doesn't exist. |
| """ |
| from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) |
| to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) |
| try: |
| get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name) |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchTableError(f"Table does not exist: {from_database_name}.{from_table_name}") from e |
| |
| glue_table = get_table_response["Table"] |
| |
| try: |
| # verify that from_identifier is a valid iceberg table |
| self._convert_glue_to_iceberg(glue_table=glue_table) |
| except NoSuchPropertyException as e: |
| raise NoSuchPropertyException( |
| f"Failed to rename table {from_database_name}.{from_table_name} since it is missing required properties" |
| ) from e |
| except NoSuchIcebergTableError as e: |
| raise NoSuchIcebergTableError( |
| f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table" |
| ) from e |
| |
| rename_table_input = _construct_rename_table_input(to_table_name=to_table_name, glue_table=glue_table) |
| self._create_glue_table(database_name=to_database_name, table_name=to_table_name, table_input=rename_table_input) |
| |
| try: |
| self.drop_table(from_identifier) |
| except Exception as e: |
| log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. " |
| |
| try: |
| self.drop_table(to_identifier) |
| log_message += f"Rolled back table creation for {to_database_name}.{to_table_name}." |
| except NoSuchTableError: |
| log_message += ( |
| f"Failed to roll back table creation for {to_database_name}.{to_table_name}. Please clean up manually" |
| ) |
| |
| raise ValueError(log_message) from e |
| |
| return self.load_table(to_identifier) |
| |
| def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: |
| """Create a namespace in the catalog. |
| |
| Args: |
| namespace: Namespace identifier. |
| properties: A string dictionary of properties for the given namespace. |
| |
| Raises: |
| ValueError: If the identifier is invalid. |
| AlreadyExistsError: If a namespace with the given name already exists. |
| """ |
| database_name = self.identifier_to_database(namespace) |
| try: |
| self.glue.create_database(DatabaseInput=_construct_database_input(database_name, properties)) |
| except self.glue.exceptions.AlreadyExistsException as e: |
| raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e |
| |
| def drop_namespace(self, namespace: Union[str, Identifier]) -> None: |
| """Drop a namespace. |
| |
| A Glue namespace can only be dropped if it is empty. |
| |
| Args: |
| namespace: Namespace identifier. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. |
| NamespaceNotEmptyError: If the namespace is not empty. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| try: |
| table_list = self.list_tables(namespace=database_name) |
| except NoSuchNamespaceError as e: |
| raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e |
| |
| if len(table_list) > 0: |
| raise NamespaceNotEmptyError(f"Database {database_name} is not empty") |
| |
| self.glue.delete_database(Name=database_name) |
| |
| def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: |
| """List Iceberg tables under the given namespace in the catalog. |
| |
| Args: |
| namespace (str | Identifier): Namespace identifier to search. |
| |
| Returns: |
| List[Identifier]: list of table identifiers. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| table_list: List[TableTypeDef] = [] |
| next_token: Optional[str] = None |
| try: |
| while True: |
| table_list_response = ( |
| self.glue.get_tables(DatabaseName=database_name) |
| if not next_token |
| else self.glue.get_tables(DatabaseName=database_name, NextToken=next_token) |
| ) |
| table_list.extend(table_list_response["TableList"]) |
| next_token = table_list_response.get("NextToken") |
| if not next_token: |
| break |
| |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e |
| return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)] |
| |
| def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: |
| """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. |
| |
| Returns: |
| List[Identifier]: a List of namespace identifiers. |
| """ |
| # Hierarchical namespace is not supported. Return an empty list |
| if namespace: |
| return [] |
| |
| database_list: List[DatabaseTypeDef] = [] |
| next_token: Optional[str] = None |
| |
| while True: |
| databases_response = self.glue.get_databases() if not next_token else self.glue.get_databases(NextToken=next_token) |
| database_list.extend(databases_response["DatabaseList"]) |
| next_token = databases_response.get("NextToken") |
| if not next_token: |
| break |
| |
| return [self.identifier_to_tuple(database["Name"]) for database in database_list] |
| |
| def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: |
| """Get properties for a namespace. |
| |
| Args: |
| namespace: Namespace identifier. |
| |
| Returns: |
| Properties: Properties for the given namespace. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| try: |
| database_response = self.glue.get_database(Name=database_name) |
| except self.glue.exceptions.EntityNotFoundException as e: |
| raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e |
| except self.glue.exceptions.InvalidInputException as e: |
| raise NoSuchNamespaceError(f"Invalid input for namespace {database_name}") from e |
| |
| database = database_response["Database"] |
| |
| properties = dict(database.get("Parameters", {})) |
| if "LocationUri" in database: |
| properties["location"] = database["LocationUri"] |
| if "Description" in database: |
| properties["Description"] = database["Description"] |
| |
| return properties |
| |
| def update_namespace_properties( |
| self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT |
| ) -> PropertiesUpdateSummary: |
| """Remove provided property keys and updates properties for a namespace. |
| |
| Args: |
| namespace: Namespace identifier. |
| removals: Set of property keys that need to be removed. Optional Argument. |
| updates: Properties to be updated for the given namespace. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. |
| ValueError: If removals and updates have overlapping keys. |
| """ |
| current_properties = self.load_namespace_properties(namespace=namespace) |
| properties_update_summary, updated_properties = self._get_updated_props_and_update_summary( |
| current_properties=current_properties, removals=removals, updates=updates |
| ) |
| |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| self.glue.update_database(Name=database_name, DatabaseInput=_construct_database_input(database_name, updated_properties)) |
| |
| return properties_update_summary |
| |
| def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: |
| raise NotImplementedError |
| |
| def drop_view(self, identifier: Union[str, Identifier]) -> None: |
| raise NotImplementedError |
| |
| def view_exists(self, identifier: Union[str, Identifier]) -> bool: |
| raise NotImplementedError |
| |
| @staticmethod |
| def __is_iceberg_table(table: TableTypeDef) -> bool: |
| return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG |