blob: c7c39a600dfb32f10a88b423d08682afcea57671 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from unittest import mock
import boto3
import pyarrow as pa
import pytest
from moto import mock_aws
from pyiceberg.catalog import METADATA_LOCATION, TABLE_TYPE
from pyiceberg.catalog.dynamodb import (
ACTIVE,
DYNAMODB_COL_CREATED_AT,
DYNAMODB_COL_IDENTIFIER,
DYNAMODB_COL_NAMESPACE,
DYNAMODB_TABLE_NAME_DEFAULT,
DynamoDbCatalog,
_add_property_prefix,
)
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
from pyiceberg.typedef import Properties
from tests.conftest import (
BUCKET_NAME,
TABLE_METADATA_LOCATION_REGEX,
UNIFIED_AWS_SESSION_PROPERTIES,
)
@mock_aws
def test_create_dynamodb_catalog_with_table_name(_dynamodb, _bucket_initialize: None) -> None: # type: ignore
DynamoDbCatalog("test_ddb_catalog")
response = _dynamodb.describe_table(TableName=DYNAMODB_TABLE_NAME_DEFAULT)
assert response["Table"]["TableName"] == DYNAMODB_TABLE_NAME_DEFAULT
assert response["Table"]["TableStatus"] == ACTIVE
custom_table_name = "custom_table_name"
DynamoDbCatalog("test_ddb_catalog", **{"table-name": custom_table_name})
response = _dynamodb.describe_table(TableName=custom_table_name)
assert response["Table"]["TableName"] == custom_table_name
assert response["Table"]["TableStatus"] == ACTIVE
@mock_aws
def test_create_table_with_database_location(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_create_table_with_pyarrow_schema(
_bucket_initialize: None,
moto_endpoint_url: str,
pyarrow_schema_simple_without_ids: pa.Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
table = test_catalog.create_table(identifier, pyarrow_schema_simple_without_ids)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_create_table_with_default_warehouse(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_create_table_with_given_location(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(
identifier=identifier, schema=table_schema_nested, location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_create_table_removes_trailing_slash_in_location(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
location = f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, location=f"{location}/")
assert table.name() == identifier
assert table.location() == location
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_create_table_with_no_location(
_bucket_initialize: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(namespace=database_name)
with pytest.raises(ValueError):
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
@mock_aws
def test_create_table_with_strips(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db/"})
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_create_table_with_strips_bucket_root(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
table_strip = test_catalog.create_table(identifier, table_schema_nested)
assert table_strip.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location)
@mock_aws
def test_create_table_with_no_database(
_bucket_initialize: None, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog")
with pytest.raises(NoSuchNamespaceError):
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
@mock_aws
def test_create_duplicated_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table(identifier, table_schema_nested)
@mock_aws
def test_create_table_if_not_exists_duplicated_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
table1 = test_catalog.create_table(identifier, table_schema_nested)
table2 = test_catalog.create_table_if_not_exists(identifier, table_schema_nested)
assert table1.name() == table2.name()
@mock_aws
def test_load_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_load_table_from_self_identifier(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
intermediate = test_catalog.load_table(identifier)
table = test_catalog.load_table(intermediate.name())
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_load_non_exist_table(_bucket_initialize: None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", warehouse=f"s3://{BUCKET_NAME}")
test_catalog.create_namespace(namespace=database_name)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@mock_aws
def test_drop_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.drop_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@mock_aws
def test_drop_table_from_self_identifier(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.drop_table(table.name())
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(table.name())
@mock_aws
def test_drop_non_exist_table(_bucket_initialize: None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", warehouse=f"s3://{BUCKET_NAME}")
with pytest.raises(NoSuchTableError):
test_catalog.drop_table(identifier)
@mock_aws
def test_rename_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (database_name, new_table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
# the metadata_location should not change
assert new_table.metadata_location == table.metadata_location
# old table should be dropped
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@mock_aws
def test_rename_table_from_self_identifier(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "test_ddb_catalog"
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (database_name, new_table_name)
test_catalog = DynamoDbCatalog(catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.rename_table(table.name(), new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
# the metadata_location should not change
assert new_table.metadata_location == table.metadata_location
# old table should be dropped
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(table.name())
@mock_aws
def test_fail_on_rename_table_with_missing_required_params(_bucket_initialize: None, database_name: str, table_name: str) -> None:
new_database_name = f"{database_name}_new"
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (new_database_name, new_table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", warehouse=f"s3://{BUCKET_NAME}")
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_namespace(namespace=new_database_name)
# Missing required params
# pylint: disable=W0212
test_catalog._put_dynamo_item(
item={
DYNAMODB_COL_IDENTIFIER: {"S": f"{database_name}.{table_name}"},
DYNAMODB_COL_NAMESPACE: {"S": database_name},
},
condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})",
)
with pytest.raises(NoSuchPropertyException):
test_catalog.rename_table(identifier, new_identifier)
@mock_aws
def test_fail_on_rename_non_iceberg_table(
_dynamodb: boto3.client, _bucket_initialize: None, database_name: str, table_name: str
) -> None:
new_database_name = f"{database_name}_new"
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (new_database_name, new_table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", warehouse=f"s3://{BUCKET_NAME}")
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_namespace(namespace=new_database_name)
# Wrong TABLE_TYPE param
# pylint: disable=W0212
test_catalog._put_dynamo_item(
item={
DYNAMODB_COL_IDENTIFIER: {"S": f"{database_name}.{table_name}"},
DYNAMODB_COL_NAMESPACE: {"S": database_name},
DYNAMODB_COL_CREATED_AT: {"S": "test-1873287263487623"},
_add_property_prefix(TABLE_TYPE): {"S": "non-iceberg-table-type"},
_add_property_prefix(METADATA_LOCATION): {"S": "test-metadata-location"},
},
condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})",
)
with pytest.raises(NoSuchIcebergTableError):
test_catalog.rename_table(identifier, new_identifier)
@mock_aws
def test_list_tables(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_list: List[str]
) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
for table_name in table_list:
test_catalog.create_table((database_name, table_name), table_schema_nested)
loaded_table_list = test_catalog.list_tables(database_name)
for table_name in table_list:
assert (database_name, table_name) in loaded_table_list
@mock_aws
def test_list_namespaces(_bucket_initialize: None, database_list: List[str]) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
for database_name in database_list:
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
for database_name in database_list:
assert (database_name,) in loaded_database_list
@mock_aws
def test_create_namespace_no_properties(_bucket_initialize: None, database_name: str) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties == {}
@mock_aws
def test_create_namespace_with_comment_and_location(_bucket_initialize: None, database_name: str) -> None:
test_location = f"s3://{BUCKET_NAME}/{database_name}.db"
test_properties = {
"comment": "this is a test description",
"location": test_location,
}
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(namespace=database_name, properties=test_properties)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties["comment"] == "this is a test description"
assert properties["location"] == test_location
@mock_aws
def test_create_duplicated_namespace(_bucket_initialize: None, database_name: str) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
with pytest.raises(NamespaceAlreadyExistsError):
test_catalog.create_namespace(namespace=database_name, properties={"test": "test"})
@mock_aws
def test_drop_namespace(_bucket_initialize: None, database_name: str) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
test_catalog.drop_namespace(database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 0
@mock_aws
def test_drop_non_empty_namespace(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
assert len(test_catalog.list_tables(database_name)) == 1
with pytest.raises(NamespaceNotEmptyError):
test_catalog.drop_namespace(database_name)
@mock_aws
def test_drop_non_exist_namespace(_bucket_initialize: None, database_name: str) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
with pytest.raises(NoSuchNamespaceError):
test_catalog.drop_namespace(database_name)
@mock_aws
def test_load_namespace_properties(_bucket_initialize: None, database_name: str) -> None:
test_location = f"s3://{BUCKET_NAME}/{database_name}.db"
test_properties = {
"comment": "this is a test description",
"location": test_location,
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(database_name, test_properties)
listed_properties = test_catalog.load_namespace_properties(database_name)
for k, v in listed_properties.items():
assert k in test_properties
assert v == test_properties[k]
@mock_aws
def test_load_non_exist_namespace_properties(_bucket_initialize: None, database_name: str) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
with pytest.raises(NoSuchNamespaceError):
test_catalog.load_namespace_properties(database_name)
@mock_aws
def test_update_namespace_properties(_bucket_initialize: None, database_name: str) -> None:
test_properties = {
"comment": "this is a test description",
"location": f"s3://{BUCKET_NAME}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(database_name, test_properties)
update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
if k == "should_not_removed":
assert k in update_report.missing
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
test_catalog.drop_namespace(database_name)
@mock_aws
def test_load_empty_namespace_properties(_bucket_initialize: None, database_name: str) -> None:
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(database_name)
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
@mock_aws
def test_update_namespace_properties_overlap_update_removal(_bucket_initialize: None, database_name: str) -> None:
test_properties = {
"comment": "this is a test description",
"location": f"s3://{BUCKET_NAME}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property1": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog = DynamoDbCatalog("test_ddb_catalog")
test_catalog.create_namespace(database_name, test_properties)
with pytest.raises(ValueError):
test_catalog.update_namespace_properties(database_name, removals, updates)
# should not modify the properties
assert test_catalog.load_namespace_properties(database_name) == test_properties
@mock_aws
def test_passing_glue_session_properties() -> None:
session_properties: Properties = {
"dynamodb.access-key-id": "dynamodb.access-key-id",
"dynamodb.secret-access-key": "dynamodb.secret-access-key",
"dynamodb.profile-name": "dynamodb.profile-name",
"dynamodb.region": "dynamodb.region",
"dynamodb.session-token": "dynamodb.session-token",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("boto3.Session") as mock_session:
mock_client = mock.Mock()
mock_session.return_value.client.return_value = mock_client
mock_client.describe_table.return_value = {"Table": {"TableStatus": "ACTIVE"}}
test_catalog = DynamoDbCatalog("dynamodb", **session_properties)
mock_session.assert_called_with(
aws_access_key_id="dynamodb.access-key-id",
aws_secret_access_key="dynamodb.secret-access-key",
aws_session_token="dynamodb.session-token",
region_name="dynamodb.region",
profile_name="dynamodb.profile-name",
botocore_session=None,
)
assert test_catalog.dynamodb is mock_session().client()
@mock_aws
def test_passing_unified_session_properties_to_dynamodb() -> None:
session_properties: Properties = {
"dynamodb.profile-name": "dynamodb.profile-name",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("boto3.Session") as mock_session:
mock_client = mock.Mock()
mock_session.return_value.client.return_value = mock_client
mock_client.describe_table.return_value = {"Table": {"TableStatus": "ACTIVE"}}
test_catalog = DynamoDbCatalog("dynamodb", **session_properties)
mock_session.assert_called_with(
aws_access_key_id="client.access-key-id",
aws_secret_access_key="client.secret-access-key",
aws_session_token="client.session-token",
region_name="client.region",
profile_name="dynamodb.profile-name",
botocore_session=None,
)
assert test_catalog.dynamodb is mock_session().client()
@mock_aws
def test_table_exists(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
# Act and Assert for an existing table
assert test_catalog.table_exists(identifier) is True
# Act and Assert for an non-existing table
assert test_catalog.table_exists(("non", "exist")) is False
@mock_aws
def test_dynamodb_client_override() -> None:
catalog_name = "glue"
test_client = boto3.client("dynamodb", region_name="us-west-2")
test_catalog = DynamoDbCatalog(catalog_name, test_client)
assert test_catalog.dynamodb is test_client