blob: 895f233c45ff9c1984be85358307af9834508d7f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Generator, List
import boto3
import pytest
from botocore.exceptions import ClientError
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.dynamodb import DynamoDbCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
from tests.conftest import clean_up, get_bucket_name, get_s3_path
# The number of tables/databases used in list_table/namespace test
LIST_TEST_NUMBER = 2
@pytest.fixture(name="dynamodb", scope="module")
def fixture_dynamodb_client() -> boto3.client:
yield boto3.client("dynamodb")
@pytest.fixture(name="test_catalog", scope="module")
def fixture_test_catalog() -> Generator[Catalog, None, None]:
"""Configure the pre- and post-setting of aws integration test."""
test_catalog = DynamoDbCatalog("test_ddb_catalog", warehouse=get_s3_path(get_bucket_name()))
yield test_catalog
clean_up(test_catalog)
def test_create_table(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested, get_s3_path(get_bucket_name(), database_name, table_name))
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
def test_create_table_with_invalid_location(table_schema_nested: Schema, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog_no_warehouse = DynamoDbCatalog("test_ddb_catalog")
test_catalog_no_warehouse.create_namespace(database_name)
with pytest.raises(ValueError):
test_catalog_no_warehouse.create_table(identifier, table_schema_nested)
test_catalog_no_warehouse.drop_namespace(database_name)
def test_create_table_with_default_location(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None:
identifier = ("invalid", table_name)
with pytest.raises(NoSuchNamespaceError):
test_catalog.create_table(identifier, table_schema_nested)
def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table((database_name, table_name), table_schema_nested)
def test_create_table_if_not_exists_duplicated_table(
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
test_catalog.create_namespace(database_name)
table1 = test_catalog.create_table((database_name, table_name), table_schema_nested)
table2 = test_catalog.create_table_if_not_exists((database_name, table_name), table_schema_nested)
assert table1.name() == table2.name()
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
loaded_table = test_catalog.load_table(identifier)
assert table.name() == loaded_table.name()
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: List[str]) -> None:
test_catalog.create_namespace(database_name)
for table_name in table_list:
test_catalog.create_table((database_name, table_name), table_schema_nested)
identifier_list = test_catalog.list_tables(database_name)
assert len(identifier_list) == LIST_TEST_NUMBER
for table_name in table_list:
assert (database_name, table_name) in identifier_list
def test_rename_table(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
new_database_name = f"{database_name}_new"
test_catalog.create_namespace(database_name)
test_catalog.create_namespace(new_database_name)
new_table_name = f"rename-{table_name}"
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
new_identifier = (new_database_name, new_table_name)
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
assert new_table.metadata_location == table.metadata_location
metadata_location = new_table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
def test_drop_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
test_catalog.drop_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
def test_purge_table(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
test_catalog.purge_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
with pytest.raises(ClientError):
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
assert (database_name,) in test_catalog.list_namespaces()
def test_create_duplicate_namespace(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
with pytest.raises(NamespaceAlreadyExistsError):
test_catalog.create_namespace(database_name)
def test_create_namepsace_if_not_exists(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_namespace_if_not_exists(database_name)
assert (database_name,) in test_catalog.list_namespaces()
def test_create_namespace_with_comment_and_location(test_catalog: Catalog, database_name: str) -> None:
test_location = get_s3_path(get_bucket_name(), database_name)
test_properties = {
"comment": "this is a test description",
"location": test_location,
}
test_catalog.create_namespace(namespace=database_name, properties=test_properties)
loaded_database_list = test_catalog.list_namespaces()
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties["comment"] == "this is a test description"
assert properties["location"] == test_location
def test_list_namespaces(test_catalog: Catalog, database_list: List[str]) -> None:
for database_name in database_list:
test_catalog.create_namespace(database_name)
db_list = test_catalog.list_namespaces()
for database_name in database_list:
assert (database_name,) in db_list
assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0
def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
test_catalog.create_namespace(database_name)
assert (database_name,) in test_catalog.list_namespaces()
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(NamespaceNotEmptyError):
test_catalog.drop_namespace(database_name)
test_catalog.drop_table((database_name, table_name))
test_catalog.drop_namespace(database_name)
assert (database_name,) not in test_catalog.list_namespaces()
def test_load_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
warehouse_location = get_s3_path(get_bucket_name())
test_properties = {
"comment": "this is a test description",
"location": f"{warehouse_location}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
test_catalog.create_namespace(database_name, test_properties)
listed_properties = test_catalog.load_namespace_properties(database_name)
for k, v in listed_properties.items():
assert k in test_properties
assert v == test_properties[k]
def test_load_empty_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
def test_update_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
warehouse_location = get_s3_path(get_bucket_name())
test_properties = {
"comment": "this is a test description",
"location": f"{warehouse_location}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog.create_namespace(database_name, test_properties)
update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
if k == "should_not_removed":
assert k in update_report.missing
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
assert test_catalog.table_exists((database_name, table_name)) is True