| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import uuid |
| from time import time |
| from typing import ( |
| TYPE_CHECKING, |
| Any, |
| Dict, |
| List, |
| Optional, |
| Set, |
| Tuple, |
| Union, |
| ) |
| |
| import boto3 |
| |
| from pyiceberg.catalog import ( |
| BOTOCORE_SESSION, |
| ICEBERG, |
| METADATA_LOCATION, |
| PREVIOUS_METADATA_LOCATION, |
| TABLE_TYPE, |
| MetastoreCatalog, |
| PropertiesUpdateSummary, |
| ) |
| from pyiceberg.exceptions import ( |
| ConditionalCheckFailedException, |
| GenericDynamoDbError, |
| NamespaceAlreadyExistsError, |
| NamespaceNotEmptyError, |
| NoSuchIcebergTableError, |
| NoSuchNamespaceError, |
| NoSuchPropertyException, |
| NoSuchTableError, |
| TableAlreadyExistsError, |
| ) |
| from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io |
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec |
| from pyiceberg.schema import Schema |
| from pyiceberg.serializers import FromInputFile |
| from pyiceberg.table import CommitTableResponse, Table |
| from pyiceberg.table.locations import load_location_provider |
| from pyiceberg.table.metadata import new_table_metadata |
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder |
| from pyiceberg.table.update import ( |
| TableRequirement, |
| TableUpdate, |
| ) |
| from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties |
| from pyiceberg.utils.properties import get_first_property_value |
| |
| if TYPE_CHECKING: |
| import pyarrow as pa |
| |
| DYNAMODB_CLIENT = "dynamodb" |
| |
| DYNAMODB_COL_IDENTIFIER = "identifier" |
| DYNAMODB_COL_NAMESPACE = "namespace" |
| DYNAMODB_COL_VERSION = "v" |
| DYNAMODB_COL_UPDATED_AT = "updated_at" |
| DYNAMODB_COL_CREATED_AT = "created_at" |
| DYNAMODB_NAMESPACE = "NAMESPACE" |
| DYNAMODB_NAMESPACE_GSI = "namespace-identifier" |
| DYNAMODB_PAY_PER_REQUEST = "PAY_PER_REQUEST" |
| |
| DYNAMODB_TABLE_NAME = "table-name" |
| DYNAMODB_TABLE_NAME_DEFAULT = "iceberg" |
| |
| PROPERTY_KEY_PREFIX = "p." |
| |
| ACTIVE = "ACTIVE" |
| ITEM = "Item" |
| |
| DYNAMODB_PROFILE_NAME = "dynamodb.profile-name" |
| DYNAMODB_REGION = "dynamodb.region" |
| DYNAMODB_ACCESS_KEY_ID = "dynamodb.access-key-id" |
| DYNAMODB_SECRET_ACCESS_KEY = "dynamodb.secret-access-key" |
| DYNAMODB_SESSION_TOKEN = "dynamodb.session-token" |
| |
| |
| class DynamoDbCatalog(MetastoreCatalog): |
| def __init__(self, name: str, **properties: str): |
| super().__init__(name, **properties) |
| |
| session = boto3.Session( |
| profile_name=properties.get(DYNAMODB_PROFILE_NAME), |
| region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION), |
| botocore_session=properties.get(BOTOCORE_SESSION), |
| aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), |
| aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), |
| aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN), |
| ) |
| self.dynamodb = session.client(DYNAMODB_CLIENT) |
| self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT) |
| self._ensure_catalog_table_exists_or_create() |
| |
| def _ensure_catalog_table_exists_or_create(self) -> None: |
| if self._dynamodb_table_exists(): |
| return None |
| |
| try: |
| self.dynamodb.create_table( |
| TableName=self.dynamodb_table_name, |
| AttributeDefinitions=CREATE_CATALOG_ATTRIBUTE_DEFINITIONS, |
| KeySchema=CREATE_CATALOG_KEY_SCHEMA, |
| GlobalSecondaryIndexes=CREATE_CATALOG_GLOBAL_SECONDARY_INDEXES, |
| BillingMode=DYNAMODB_PAY_PER_REQUEST, |
| ) |
| except ( |
| self.dynamodb.exceptions.ResourceInUseException, |
| self.dynamodb.exceptions.LimitExceededException, |
| self.dynamodb.exceptions.InternalServerError, |
| ) as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| def _dynamodb_table_exists(self) -> bool: |
| try: |
| response = self.dynamodb.describe_table(TableName=self.dynamodb_table_name) |
| except self.dynamodb.exceptions.ResourceNotFoundException: |
| return False |
| except self.dynamodb.exceptions.InternalServerError as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| if response["Table"]["TableStatus"] != ACTIVE: |
| raise GenericDynamoDbError(f"DynamoDB table for catalog {self.dynamodb_table_name} is not {ACTIVE}") |
| else: |
| return True |
| |
| def create_table( |
| self, |
| identifier: Union[str, Identifier], |
| schema: Union[Schema, "pa.Schema"], |
| location: Optional[str] = None, |
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, |
| sort_order: SortOrder = UNSORTED_SORT_ORDER, |
| properties: Properties = EMPTY_DICT, |
| ) -> Table: |
| """ |
| Create an Iceberg table. |
| |
| Args: |
| identifier: Table identifier. |
| schema: Table's schema. |
| location: Location for the table. Optional Argument. |
| partition_spec: PartitionSpec for the table. |
| sort_order: SortOrder for the table. |
| properties: Table properties that can be a string based dictionary. |
| |
| Returns: |
| Table: the created table instance. |
| |
| Raises: |
| AlreadyExistsError: If a table with the name already exists. |
| ValueError: If the identifier is invalid, or no path is given to store metadata. |
| |
| """ |
| schema: Schema = self._convert_schema_if_needed(schema) # type: ignore |
| |
| database_name, table_name = self.identifier_to_database_and_table(identifier) |
| |
| location = self._resolve_table_location(location, database_name, table_name) |
| provider = load_location_provider(table_location=location, table_properties=properties) |
| metadata_location = provider.new_table_metadata_file_location() |
| |
| metadata = new_table_metadata( |
| location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties |
| ) |
| io = load_file_io(properties=self.properties, location=metadata_location) |
| self._write_metadata(metadata, io, metadata_location) |
| |
| self._ensure_namespace_exists(database_name=database_name) |
| |
| try: |
| self._put_dynamo_item( |
| item=_get_create_table_item( |
| database_name=database_name, table_name=table_name, properties=properties, metadata_location=metadata_location |
| ), |
| condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})", |
| ) |
| except ConditionalCheckFailedException as e: |
| raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e |
| |
| return self.load_table(identifier=identifier) |
| |
| def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: |
| """Register a new table using existing metadata. |
| |
| Args: |
| identifier Union[str, Identifier]: Table identifier for the table |
| metadata_location str: The location to the metadata |
| |
| Returns: |
| Table: The newly registered table |
| |
| Raises: |
| TableAlreadyExistsError: If the table already exists |
| """ |
| raise NotImplementedError |
| |
| def commit_table( |
| self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] |
| ) -> CommitTableResponse: |
| """Commit updates to a table. |
| |
| Args: |
| table (Table): The table to be updated. |
| requirements: (Tuple[TableRequirement, ...]): Table requirements. |
| updates: (Tuple[TableUpdate, ...]): Table updates. |
| |
| Returns: |
| CommitTableResponse: The updated metadata. |
| |
| Raises: |
| NoSuchTableError: If a table with the given identifier does not exist. |
| CommitFailedException: Requirement not met, or a conflict with a concurrent commit. |
| """ |
| raise NotImplementedError |
| |
| def load_table(self, identifier: Union[str, Identifier]) -> Table: |
| """ |
| Load the table's metadata and returns the table instance. |
| |
| You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'. |
| Note: This method doesn't scan data stored in the table. |
| |
| Args: |
| identifier: Table identifier. |
| |
| Returns: |
| Table: the table instance with its metadata. |
| |
| Raises: |
| NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. |
| """ |
| database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) |
| dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name) |
| return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item) |
| |
| def drop_table(self, identifier: Union[str, Identifier]) -> None: |
| """Drop a table. |
| |
| Args: |
| identifier: Table identifier. |
| |
| Raises: |
| NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. |
| """ |
| database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) |
| |
| try: |
| self._delete_dynamo_item( |
| namespace=database_name, |
| identifier=f"{database_name}.{table_name}", |
| condition_expression=f"attribute_exists({DYNAMODB_COL_IDENTIFIER})", |
| ) |
| except ConditionalCheckFailedException as e: |
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e |
| |
| def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: |
| """Rename a fully classified table name. |
| |
| This method can only rename Iceberg tables in AWS Glue. |
| |
| Args: |
| from_identifier: Existing table identifier. |
| to_identifier: New table identifier. |
| |
| Returns: |
| Table: the updated table instance with its metadata. |
| |
| Raises: |
| ValueError: When from table identifier is invalid. |
| NoSuchTableError: When a table with the name does not exist. |
| NoSuchIcebergTableError: When from table is not a valid iceberg table. |
| NoSuchPropertyException: When from table miss some required properties. |
| NoSuchNamespaceError: When the destination namespace doesn't exist. |
| """ |
| from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) |
| to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) |
| |
| from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name) |
| |
| try: |
| # Verify that from_identifier is a valid iceberg table |
| self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=from_table_item) |
| except NoSuchPropertyException as e: |
| raise NoSuchPropertyException( |
| f"Failed to rename table {from_database_name}.{from_table_name} since it is missing required properties" |
| ) from e |
| except NoSuchIcebergTableError as e: |
| raise NoSuchIcebergTableError( |
| f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table" |
| ) from e |
| |
| self._ensure_namespace_exists(database_name=from_database_name) |
| self._ensure_namespace_exists(database_name=to_database_name) |
| |
| try: |
| self._put_dynamo_item( |
| item=_get_rename_table_item( |
| from_dynamo_table_item=from_table_item, to_database_name=to_database_name, to_table_name=to_table_name |
| ), |
| condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})", |
| ) |
| except ConditionalCheckFailedException as e: |
| raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e |
| |
| try: |
| self.drop_table(from_identifier) |
| except (NoSuchTableError, GenericDynamoDbError) as e: |
| log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. " |
| |
| try: |
| self.drop_table(to_identifier) |
| log_message += f"Rolled back table creation for {to_database_name}.{to_table_name}." |
| except (NoSuchTableError, GenericDynamoDbError): |
| log_message += ( |
| f"Failed to roll back table creation for {to_database_name}.{to_table_name}. Please clean up manually" |
| ) |
| |
| raise ValueError(log_message) from e |
| |
| return self.load_table(to_identifier) |
| |
| def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: |
| """Create a namespace in the catalog. |
| |
| Args: |
| namespace: Namespace identifier. |
| properties: A string dictionary of properties for the given namespace. |
| |
| Raises: |
| ValueError: If the identifier is invalid. |
| AlreadyExistsError: If a namespace with the given name already exists. |
| """ |
| database_name = self.identifier_to_database(namespace) |
| |
| try: |
| self._put_dynamo_item( |
| item=_get_create_database_item(database_name=database_name, properties=properties), |
| condition_expression=f"attribute_not_exists({DYNAMODB_COL_NAMESPACE})", |
| ) |
| except ConditionalCheckFailedException as e: |
| raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e |
| |
| def drop_namespace(self, namespace: Union[str, Identifier]) -> None: |
| """Drop a namespace. |
| |
| A Glue namespace can only be dropped if it is empty. |
| |
| Args: |
| namespace: Namespace identifier. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. |
| NamespaceNotEmptyError: If the namespace is not empty. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| table_identifiers = self.list_tables(namespace=database_name) |
| |
| if len(table_identifiers) > 0: |
| raise NamespaceNotEmptyError(f"Database {database_name} is not empty") |
| |
| try: |
| self._delete_dynamo_item( |
| namespace=database_name, |
| identifier=DYNAMODB_NAMESPACE, |
| condition_expression=f"attribute_exists({DYNAMODB_COL_IDENTIFIER})", |
| ) |
| except ConditionalCheckFailedException as e: |
| raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e |
| |
| def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: |
| """List Iceberg tables under the given namespace in the catalog. |
| |
| Args: |
| namespace (str | Identifier): Namespace identifier to search. |
| |
| Returns: |
| List[Identifier]: list of table identifiers. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| |
| paginator = self.dynamodb.get_paginator("query") |
| |
| try: |
| page_iterator = paginator.paginate( |
| TableName=self.dynamodb_table_name, |
| IndexName=DYNAMODB_NAMESPACE_GSI, |
| KeyConditionExpression=f"{DYNAMODB_COL_NAMESPACE} = :namespace ", |
| ExpressionAttributeValues={ |
| ":namespace": { |
| "S": database_name, |
| } |
| }, |
| ) |
| except ( |
| self.dynamodb.exceptions.ProvisionedThroughputExceededException, |
| self.dynamodb.exceptions.RequestLimitExceeded, |
| self.dynamodb.exceptions.InternalServerError, |
| self.dynamodb.exceptions.ResourceNotFoundException, |
| ) as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| table_identifiers = [] |
| for page in page_iterator: |
| for item in page["Items"]: |
| _dict = _convert_dynamo_item_to_regular_dict(item) |
| identifier_col = _dict[DYNAMODB_COL_IDENTIFIER] |
| if identifier_col == DYNAMODB_NAMESPACE: |
| continue |
| |
| table_identifiers.append(self.identifier_to_tuple(identifier_col)) |
| |
| return table_identifiers |
| |
| def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: |
| """List top-level namespaces from the catalog. |
| |
| We do not support hierarchical namespace. |
| |
| Returns: |
| List[Identifier]: a List of namespace identifiers. |
| """ |
| # Hierarchical namespace is not supported. Return an empty list |
| if namespace: |
| return [] |
| |
| paginator = self.dynamodb.get_paginator("query") |
| |
| try: |
| page_iterator = paginator.paginate( |
| TableName=self.dynamodb_table_name, |
| ConsistentRead=True, |
| KeyConditionExpression=f"{DYNAMODB_COL_IDENTIFIER} = :identifier", |
| ExpressionAttributeValues={ |
| ":identifier": { |
| "S": DYNAMODB_NAMESPACE, |
| } |
| }, |
| ) |
| except ( |
| self.dynamodb.exceptions.ProvisionedThroughputExceededException, |
| self.dynamodb.exceptions.RequestLimitExceeded, |
| self.dynamodb.exceptions.InternalServerError, |
| self.dynamodb.exceptions.ResourceNotFoundException, |
| ) as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| database_identifiers = [] |
| for page in page_iterator: |
| for item in page["Items"]: |
| _dict = _convert_dynamo_item_to_regular_dict(item) |
| namespace_col = _dict[DYNAMODB_COL_NAMESPACE] |
| database_identifiers.append(self.identifier_to_tuple(namespace_col)) |
| |
| return database_identifiers |
| |
| def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: |
| """ |
| Get properties for a namespace. |
| |
| Args: |
| namespace: Namespace identifier. |
| |
| Returns: |
| Properties: Properties for the given namespace. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| namespace_item = self._get_iceberg_namespace_item(database_name=database_name) |
| namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item) |
| return _get_namespace_properties(namespace_dict=namespace_dict) |
| |
| def update_namespace_properties( |
| self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT |
| ) -> PropertiesUpdateSummary: |
| """ |
| Remove or update provided property keys for a namespace. |
| |
| Args: |
| namespace: Namespace identifier |
| removals: Set of property keys that need to be removed. Optional Argument. |
| updates: Properties to be updated for the given namespace. |
| |
| Raises: |
| NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid. |
| ValueError: If removals and updates have overlapping keys. |
| """ |
| database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) |
| namespace_item = self._get_iceberg_namespace_item(database_name=database_name) |
| namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item) |
| current_properties = _get_namespace_properties(namespace_dict=namespace_dict) |
| |
| properties_update_summary, updated_properties = self._get_updated_props_and_update_summary( |
| current_properties=current_properties, removals=removals, updates=updates |
| ) |
| |
| try: |
| self._put_dynamo_item( |
| item=_get_update_database_item( |
| namespace_item=namespace_item, |
| updated_properties=updated_properties, |
| ), |
| condition_expression=f"attribute_exists({DYNAMODB_COL_NAMESPACE})", |
| ) |
| except ConditionalCheckFailedException as e: |
| raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e |
| |
| return properties_update_summary |
| |
| def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: |
| raise NotImplementedError |
| |
| def drop_view(self, identifier: Union[str, Identifier]) -> None: |
| raise NotImplementedError |
| |
| def view_exists(self, identifier: Union[str, Identifier]) -> bool: |
| raise NotImplementedError |
| |
| def _get_iceberg_table_item(self, database_name: str, table_name: str) -> Dict[str, Any]: |
| try: |
| return self._get_dynamo_item(identifier=f"{database_name}.{table_name}", namespace=database_name) |
| except ValueError as e: |
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e |
| |
| def _get_iceberg_namespace_item(self, database_name: str) -> Dict[str, Any]: |
| try: |
| return self._get_dynamo_item(identifier=DYNAMODB_NAMESPACE, namespace=database_name) |
| except ValueError as e: |
| raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}") from e |
| |
| def _ensure_namespace_exists(self, database_name: str) -> Dict[str, Any]: |
| return self._get_iceberg_namespace_item(database_name) |
| |
| def _get_dynamo_item(self, identifier: str, namespace: str) -> Dict[str, Any]: |
| try: |
| response = self.dynamodb.get_item( |
| TableName=self.dynamodb_table_name, |
| ConsistentRead=True, |
| Key={ |
| DYNAMODB_COL_IDENTIFIER: { |
| "S": identifier, |
| }, |
| DYNAMODB_COL_NAMESPACE: { |
| "S": namespace, |
| }, |
| }, |
| ) |
| if ITEM in response: |
| return response[ITEM] |
| else: |
| raise ValueError(f"Item not found. identifier: {identifier} - namespace: {namespace}") |
| except self.dynamodb.exceptions.ResourceNotFoundException as e: |
| raise ValueError(f"Item not found. identifier: {identifier} - namespace: {namespace}") from e |
| except ( |
| self.dynamodb.exceptions.ProvisionedThroughputExceededException, |
| self.dynamodb.exceptions.RequestLimitExceeded, |
| self.dynamodb.exceptions.InternalServerError, |
| ) as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| def _put_dynamo_item(self, item: Dict[str, Any], condition_expression: str) -> None: |
| try: |
| self.dynamodb.put_item(TableName=self.dynamodb_table_name, Item=item, ConditionExpression=condition_expression) |
| except self.dynamodb.exceptions.ConditionalCheckFailedException as e: |
| raise ConditionalCheckFailedException(f"Condition expression check failed: {condition_expression} - {item}") from e |
| except ( |
| self.dynamodb.exceptions.ProvisionedThroughputExceededException, |
| self.dynamodb.exceptions.RequestLimitExceeded, |
| self.dynamodb.exceptions.InternalServerError, |
| self.dynamodb.exceptions.ResourceNotFoundException, |
| self.dynamodb.exceptions.ItemCollectionSizeLimitExceededException, |
| self.dynamodb.exceptions.TransactionConflictException, |
| ) as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| def _delete_dynamo_item(self, namespace: str, identifier: str, condition_expression: str) -> None: |
| try: |
| self.dynamodb.delete_item( |
| TableName=self.dynamodb_table_name, |
| Key={ |
| DYNAMODB_COL_IDENTIFIER: { |
| "S": identifier, |
| }, |
| DYNAMODB_COL_NAMESPACE: { |
| "S": namespace, |
| }, |
| }, |
| ConditionExpression=condition_expression, |
| ) |
| except self.dynamodb.exceptions.ConditionalCheckFailedException as e: |
| raise ConditionalCheckFailedException( |
| f"Condition expression check failed: {condition_expression} - {identifier}" |
| ) from e |
| except ( |
| self.dynamodb.exceptions.ProvisionedThroughputExceededException, |
| self.dynamodb.exceptions.RequestLimitExceeded, |
| self.dynamodb.exceptions.InternalServerError, |
| self.dynamodb.exceptions.ResourceNotFoundException, |
| self.dynamodb.exceptions.ItemCollectionSizeLimitExceededException, |
| self.dynamodb.exceptions.TransactionConflictException, |
| ) as e: |
| raise GenericDynamoDbError(e.message) from e |
| |
| def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[str, Any]) -> Table: |
| table_dict = _convert_dynamo_item_to_regular_dict(dynamo_table_item) |
| |
| for prop in [_add_property_prefix(prop) for prop in (TABLE_TYPE, METADATA_LOCATION)] + [ |
| DYNAMODB_COL_IDENTIFIER, |
| DYNAMODB_COL_NAMESPACE, |
| DYNAMODB_COL_CREATED_AT, |
| ]: |
| if prop not in table_dict.keys(): |
| raise NoSuchPropertyException(f"Iceberg required property {prop} is missing: {dynamo_table_item}") |
| |
| table_type = table_dict[_add_property_prefix(TABLE_TYPE)] |
| identifier = table_dict[DYNAMODB_COL_IDENTIFIER] |
| metadata_location = table_dict[_add_property_prefix(METADATA_LOCATION)] |
| database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) |
| |
| if table_type.lower() != ICEBERG: |
| raise NoSuchIcebergTableError( |
| f"Property table_type is {table_type}, expected {ICEBERG}: {database_name}.{table_name}" |
| ) |
| |
| io = load_file_io(properties=self.properties, location=metadata_location) |
| file = io.new_input(metadata_location) |
| metadata = FromInputFile.table_metadata(file) |
| return Table( |
| identifier=(database_name, table_name), |
| metadata=metadata, |
| metadata_location=metadata_location, |
| io=self._load_file_io(metadata.properties, metadata_location), |
| catalog=self, |
| ) |
| |
| |
| def _get_create_table_item(database_name: str, table_name: str, properties: Properties, metadata_location: str) -> Dict[str, Any]: |
| current_timestamp_ms = str(round(time() * 1000)) |
| _dict = { |
| DYNAMODB_COL_IDENTIFIER: { |
| "S": f"{database_name}.{table_name}", |
| }, |
| DYNAMODB_COL_NAMESPACE: { |
| "S": database_name, |
| }, |
| DYNAMODB_COL_VERSION: { |
| "S": str(uuid.uuid4()), |
| }, |
| DYNAMODB_COL_CREATED_AT: { |
| "N": current_timestamp_ms, |
| }, |
| DYNAMODB_COL_UPDATED_AT: { |
| "N": current_timestamp_ms, |
| }, |
| } |
| |
| for key, val in properties.items(): |
| _dict[_add_property_prefix(key)] = {"S": val} |
| |
| _dict[_add_property_prefix(TABLE_TYPE)] = {"S": ICEBERG.upper()} |
| _dict[_add_property_prefix(METADATA_LOCATION)] = {"S": metadata_location} |
| _dict[_add_property_prefix(PREVIOUS_METADATA_LOCATION)] = {"S": ""} |
| |
| return _dict |
| |
| |
| def _get_rename_table_item(from_dynamo_table_item: Dict[str, Any], to_database_name: str, to_table_name: str) -> Dict[str, Any]: |
| _dict = from_dynamo_table_item |
| current_timestamp_ms = str(round(time() * 1000)) |
| _dict[DYNAMODB_COL_IDENTIFIER]["S"] = f"{to_database_name}.{to_table_name}" |
| _dict[DYNAMODB_COL_NAMESPACE]["S"] = to_database_name |
| _dict[DYNAMODB_COL_VERSION]["S"] = str(uuid.uuid4()) |
| _dict[DYNAMODB_COL_UPDATED_AT]["N"] = current_timestamp_ms |
| return _dict |
| |
| |
| def _get_create_database_item(database_name: str, properties: Properties) -> Dict[str, Any]: |
| current_timestamp_ms = str(round(time() * 1000)) |
| _dict = { |
| DYNAMODB_COL_IDENTIFIER: { |
| "S": DYNAMODB_NAMESPACE, |
| }, |
| DYNAMODB_COL_NAMESPACE: { |
| "S": database_name, |
| }, |
| DYNAMODB_COL_VERSION: { |
| "S": str(uuid.uuid4()), |
| }, |
| DYNAMODB_COL_CREATED_AT: { |
| "N": current_timestamp_ms, |
| }, |
| DYNAMODB_COL_UPDATED_AT: { |
| "N": current_timestamp_ms, |
| }, |
| } |
| |
| for key, val in properties.items(): |
| _dict[_add_property_prefix(key)] = {"S": val} |
| |
| return _dict |
| |
| |
| def _get_update_database_item(namespace_item: Dict[str, Any], updated_properties: Properties) -> Dict[str, Any]: |
| current_timestamp_ms = str(round(time() * 1000)) |
| |
| _dict = { |
| DYNAMODB_COL_IDENTIFIER: namespace_item[DYNAMODB_COL_IDENTIFIER], |
| DYNAMODB_COL_NAMESPACE: namespace_item[DYNAMODB_COL_NAMESPACE], |
| DYNAMODB_COL_VERSION: { |
| "S": str(uuid.uuid4()), |
| }, |
| DYNAMODB_COL_CREATED_AT: namespace_item[DYNAMODB_COL_CREATED_AT], |
| DYNAMODB_COL_UPDATED_AT: { |
| "N": current_timestamp_ms, |
| }, |
| } |
| |
| for key, val in updated_properties.items(): |
| _dict[_add_property_prefix(key)] = {"S": val} |
| |
| return _dict |
| |
| |
| CREATE_CATALOG_ATTRIBUTE_DEFINITIONS = [ |
| { |
| "AttributeName": DYNAMODB_COL_IDENTIFIER, |
| "AttributeType": "S", |
| }, |
| { |
| "AttributeName": DYNAMODB_COL_NAMESPACE, |
| "AttributeType": "S", |
| }, |
| ] |
| |
| CREATE_CATALOG_KEY_SCHEMA = [ |
| { |
| "AttributeName": DYNAMODB_COL_IDENTIFIER, |
| "KeyType": "HASH", |
| }, |
| { |
| "AttributeName": DYNAMODB_COL_NAMESPACE, |
| "KeyType": "RANGE", |
| }, |
| ] |
| |
| |
| CREATE_CATALOG_GLOBAL_SECONDARY_INDEXES = [ |
| { |
| "IndexName": DYNAMODB_NAMESPACE_GSI, |
| "KeySchema": [ |
| { |
| "AttributeName": DYNAMODB_COL_NAMESPACE, |
| "KeyType": "HASH", |
| }, |
| { |
| "AttributeName": DYNAMODB_COL_IDENTIFIER, |
| "KeyType": "RANGE", |
| }, |
| ], |
| "Projection": { |
| "ProjectionType": "KEYS_ONLY", |
| }, |
| } |
| ] |
| |
| |
| def _get_namespace_properties(namespace_dict: Dict[str, str]) -> Properties: |
| return {_remove_property_prefix(key): val for key, val in namespace_dict.items() if key.startswith(PROPERTY_KEY_PREFIX)} |
| |
| |
| def _convert_dynamo_item_to_regular_dict(dynamo_json: Dict[str, Any]) -> Dict[str, str]: |
| """Convert a dynamo json to a regular json. |
| |
| Example of a dynamo json: |
| { |
| "AlbumTitle": { |
| "S": "Songs About Life", |
| }, |
| "Artist": { |
| "S": "Acme Band", |
| }, |
| "SongTitle": { |
| "S": "Happy Day", |
| } |
| } |
| |
| Converted to regular json: |
| { |
| "AlbumTitle": "Songs About Life", |
| "Artist": "Acme Band", |
| "SongTitle": "Happy Day" |
| } |
| |
| Only "S" and "N" data types are supported since those are the only ones that Iceberg is utilizing. |
| """ |
| regular_json = {} |
| for column_name, val_dict in dynamo_json.items(): |
| keys = list(val_dict.keys()) |
| |
| if len(keys) != 1: |
| raise ValueError(f"Expecting only 1 key: {keys}") |
| |
| data_type = keys[0] |
| if data_type not in ("S", "N"): |
| raise ValueError("Only S and N data types are supported.") |
| |
| values = list(val_dict.values()) |
| assert len(values) == 1 |
| column_value = values[0] |
| regular_json[column_name] = column_value |
| |
| return regular_json |
| |
| |
| def _add_property_prefix(prop: str) -> str: |
| return PROPERTY_KEY_PREFIX + prop |
| |
| |
| def _remove_property_prefix(prop: str) -> str: |
| return prop.lstrip(PROPERTY_KEY_PREFIX) |