blob: 475fc07eadcf4a8e2fe95b041c8e465451e08ee7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from typing import Any, Dict, Generator, List
from uuid import uuid4
import boto3
import pyarrow as pa
import pytest
from botocore.exceptions import ClientError
from pyiceberg.catalog import Catalog, MetastoreCatalog
from pyiceberg.catalog.glue import GLUE_CATALOG_ENDPOINT, GlueCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_glue_endpoint, get_s3_path
# The number of tables/databases used in list_table/namespace test
LIST_TEST_NUMBER = 2
CATALOG_NAME = "glue"
@pytest.fixture(name="glue", scope="module")
def fixture_glue_client() -> boto3.client:
yield boto3.client("glue")
@pytest.fixture(name="test_catalog", scope="module")
def fixture_test_catalog() -> Generator[Catalog, None, None]:
"""Configure the pre- and post-setting of aws integration test."""
test_catalog = GlueCatalog(
CATALOG_NAME, **{"warehouse": get_s3_path(get_bucket_name()), GLUE_CATALOG_ENDPOINT: get_glue_endpoint()}
)
yield test_catalog
clean_up(test_catalog)
class AthenaQueryHelper:
_athena_client: boto3.client
_s3_resource: boto3.resource
_output_bucket: str
_output_path: str
def __init__(self) -> None:
self._s3_resource = boto3.resource("s3")
self._athena_client = boto3.client("athena")
self._output_bucket = get_bucket_name()
self._output_path = f"athena_results_{uuid4()}"
def get_query_results(self, query: str) -> List[Dict[str, Any]]:
query_execution_id = self._athena_client.start_query_execution(
QueryString=query, ResultConfiguration={"OutputLocation": f"s3://{self._output_bucket}/{self._output_path}"}
)["QueryExecutionId"]
while True:
result = self._athena_client.get_query_execution(QueryExecutionId=query_execution_id)["QueryExecution"]["Status"]
query_status = result["State"]
assert query_status not in [
"FAILED",
"CANCELLED",
], f"""
Athena query with the string failed or was cancelled:
Query: {query}
Status: {query_status}
Reason: {result["StateChangeReason"]}"""
if query_status not in ["QUEUED", "RUNNING"]:
break
time.sleep(0.5)
# No pagination for now, assume that we are not doing large queries
return self._athena_client.get_query_results(QueryExecutionId=query_execution_id)["ResultSet"]["Rows"]
def clean_up(self) -> None:
bucket = self._s3_resource.Bucket(self._output_bucket)
for obj in bucket.objects.filter(Prefix=f"{self._output_path}/"):
self._s3_resource.Object(bucket.name, obj.key).delete()
@pytest.fixture(name="athena", scope="module")
def fixture_athena_helper() -> Generator[AthenaQueryHelper, None, None]:
query_helper = AthenaQueryHelper()
yield query_helper
query_helper.clean_up()
def test_create_table(
test_catalog: Catalog,
s3: boto3.client,
table_schema_nested: Schema,
table_name: str,
database_name: str,
athena: AthenaQueryHelper,
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested, get_s3_path(get_bucket_name(), database_name, table_name))
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
table.append(
pa.Table.from_pylist(
[
{
"foo": "foo_val",
"bar": 1,
"baz": False,
"qux": ["x", "y"],
"quux": {"key": {"subkey": 2}},
"location": [{"latitude": 1.1}],
"person": {"name": "some_name", "age": 23},
}
],
schema=schema_to_pyarrow(table.schema()),
),
)
assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
]
},
{
"Data": [
{"VarCharValue": "foo_val"},
{"VarCharValue": "1"},
{"VarCharValue": "false"},
{"VarCharValue": "[x, y]"},
{"VarCharValue": "{key={subkey=2}}"},
{"VarCharValue": "[{latitude=1.1, longitude=null}]"},
{"VarCharValue": "{name=some_name, age=23}"},
]
},
]
def test_create_table_with_invalid_location(table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog_no_warehouse = GlueCatalog("glue")
test_catalog_no_warehouse.create_namespace(database_name)
with pytest.raises(ValueError):
test_catalog_no_warehouse.create_table(identifier, table_schema_nested)
test_catalog_no_warehouse.drop_namespace(database_name)
def test_create_table_with_default_location(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None:
identifier = ("invalid", table_name)
with pytest.raises(NoSuchNamespaceError):
test_catalog.create_table(identifier, table_schema_nested)
def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table((database_name, table_name), table_schema_nested)
def test_create_table_if_not_exists_duplicated_table(
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
test_catalog.create_namespace(database_name)
table1 = test_catalog.create_table((database_name, table_name), table_schema_nested)
table2 = test_catalog.create_table_if_not_exists((database_name, table_name), table_schema_nested)
assert table1.name() == table2.name()
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
loaded_table = test_catalog.load_table(identifier)
assert table.name() == loaded_table.name()
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: List[str]) -> None:
test_catalog.create_namespace(database_name)
for table_name in table_list:
test_catalog.create_table((database_name, table_name), table_schema_nested)
identifier_list = test_catalog.list_tables(database_name)
assert len(identifier_list) == LIST_TEST_NUMBER
for table_name in table_list:
assert (database_name, table_name) in identifier_list
def test_rename_table(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
new_database_name = f"{database_name}_new"
test_catalog.create_namespace(database_name)
test_catalog.create_namespace(new_database_name)
new_table_name = f"rename-{table_name}"
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
assert table.name() == identifier
new_identifier = (new_database_name, new_table_name)
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
assert new_table.metadata_location == table.metadata_location
metadata_location = new_table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
def test_drop_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
test_catalog.drop_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
def test_purge_table(
test_catalog: Catalog, s3: boto3.client, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
test_catalog.purge_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
with pytest.raises(ClientError):
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
assert (database_name,) in test_catalog.list_namespaces()
def test_create_duplicate_namespace(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
with pytest.raises(NamespaceAlreadyExistsError):
test_catalog.create_namespace(database_name)
def test_create_namespace_if_not_exists(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_namespace_if_not_exists(database_name)
assert (database_name,) in test_catalog.list_namespaces()
def test_create_namespace_with_comment_and_location(test_catalog: Catalog, database_name: str) -> None:
test_location = get_s3_path(get_bucket_name(), database_name)
test_properties = {
"comment": "this is a test description",
"location": test_location,
}
test_catalog.create_namespace(namespace=database_name, properties=test_properties)
loaded_database_list = test_catalog.list_namespaces()
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties["comment"] == "this is a test description"
assert properties["location"] == test_location
def test_list_namespaces(test_catalog: Catalog, database_list: List[str]) -> None:
for database_name in database_list:
test_catalog.create_namespace(database_name)
db_list = test_catalog.list_namespaces()
for database_name in database_list:
assert (database_name,) in db_list
assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0
def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
test_catalog.create_namespace(database_name)
assert (database_name,) in test_catalog.list_namespaces()
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(NamespaceNotEmptyError):
test_catalog.drop_namespace(database_name)
test_catalog.drop_table((database_name, table_name))
test_catalog.drop_namespace(database_name)
assert (database_name,) not in test_catalog.list_namespaces()
def test_load_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
warehouse_location = get_s3_path(get_bucket_name())
test_properties = {
"comment": "this is a test description",
"location": f"{warehouse_location}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
test_catalog.create_namespace(database_name, test_properties)
listed_properties = test_catalog.load_namespace_properties(database_name)
for k, v in listed_properties.items():
assert k in test_properties
assert v == test_properties[k]
def test_load_empty_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
def test_load_default_namespace_properties(test_catalog: Catalog, glue: boto3.client, database_name: str) -> None:
# simulate creating database with default settings through AWS Glue Web Console
glue.create_database(DatabaseInput={"Name": database_name})
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
def test_update_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
warehouse_location = get_s3_path(get_bucket_name())
test_properties = {
"comment": "this is a test description",
"location": f"{warehouse_location}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog.create_namespace(database_name, test_properties)
update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
if k == "should_not_removed":
assert k in update_report.missing
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
def test_commit_table_update_schema(
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str, athena: AthenaQueryHelper
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
assert original_table_metadata.current_schema_id == 0
assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
]
}
]
transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
table.append(
pa.Table.from_pylist(
[
{
"foo": "foo_val",
"bar": 1,
"location": [{"latitude": 1.1}],
"person": {"name": "some_name", "age": 23},
"b": 2,
}
],
schema=schema_to_pyarrow(new_schema),
),
)
assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
{"VarCharValue": "b"},
]
},
{
"Data": [
{"VarCharValue": "foo_val"},
{"VarCharValue": "1"},
{},
{"VarCharValue": "[]"},
{"VarCharValue": "{}"},
{"VarCharValue": "[{latitude=1.1, longitude=null}]"},
{"VarCharValue": "{name=some_name, age=23}"},
{"VarCharValue": "2"},
]
},
]
def test_commit_table_properties(
test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"Description": "test_description", "test_a": "test_aa", "test_c": "test_c"}
table_info = glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
assert table_info["Table"]["Description"] == "test_description"
@pytest.mark.parametrize("format_version", [1, 2])
def test_create_table_transaction(
test_catalog: Catalog,
s3: boto3.client,
table_schema_nested: Schema,
table_name: str,
database_name: str,
athena: AthenaQueryHelper,
format_version: int,
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
with test_catalog.create_table_transaction(
identifier,
table_schema_nested,
get_s3_path(get_bucket_name(), database_name, table_name),
properties={"format-version": format_version},
) as txn:
df = pa.Table.from_pylist(
[
{
"foo": "foo_val",
"bar": 1,
"baz": False,
"qux": ["x", "y"],
"quux": {"key": {"subkey": 2}},
"location": [{"latitude": 1.1}],
"person": {"name": "some_name", "age": 23},
}
],
schema=schema_to_pyarrow(txn.table_metadata.schema()),
)
with txn.update_snapshot().fast_append() as update_snapshot:
data_files = _dataframe_to_data_files(
table_metadata=txn.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=txn._table.io
)
for data_file in data_files:
update_snapshot.append_data_file(data_file)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
assert athena.get_query_results(f'SELECT * FROM "{database_name}"."{table_name}"') == [
{
"Data": [
{"VarCharValue": "foo"},
{"VarCharValue": "bar"},
{"VarCharValue": "baz"},
{"VarCharValue": "qux"},
{"VarCharValue": "quux"},
{"VarCharValue": "location"},
{"VarCharValue": "person"},
]
},
{
"Data": [
{"VarCharValue": "foo_val"},
{"VarCharValue": "1"},
{"VarCharValue": "false"},
{"VarCharValue": "[x, y]"},
{"VarCharValue": "{key={subkey=2}}"},
{"VarCharValue": "[{latitude=1.1, longitude=null}]"},
{"VarCharValue": "{name=some_name, age=23}"},
]
},
]
def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
assert test_catalog.table_exists((database_name, table_name)) is True
def test_register_table_with_given_location(
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)
new_identifier = (database_name, f"new_{table_name}")
test_catalog.create_namespace(database_name)
tbl = test_catalog.create_table(identifier, table_schema_nested)
location = tbl.metadata_location
test_catalog.drop_table(identifier) # drops the table but keeps the metadata file
assert not test_catalog.table_exists(identifier)
table = test_catalog.register_table(new_identifier, location)
assert table.name() == new_identifier
assert table.metadata_location == location
assert test_catalog.table_exists(new_identifier)