blob: 0ff43cd52ba4f5900d017704a57a9c1f94827629 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from unittest import mock
import boto3
import pyarrow as pa
import pytest
from moto import mock_aws
from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import Properties
from pyiceberg.types import IntegerType
from tests.conftest import (
BUCKET_NAME,
TABLE_METADATA_LOCATION_REGEX,
UNIFIED_AWS_SESSION_PROPERTIES,
)
@mock_aws
def test_create_table_with_database_location(
_glue: boto3.client,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
# Ensure schema is also pushed to Glue
table_info = _glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
storage_descriptor = table_info["Table"]["StorageDescriptor"]
columns = storage_descriptor["Columns"]
assert len(columns) == len(table_schema_nested.fields)
assert columns[0] == {
"Name": "foo",
"Type": "string",
"Parameters": {"iceberg.field.id": "1", "iceberg.field.optional": "true", "iceberg.field.current": "true"},
}
assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
@mock_aws
def test_create_v1_table(
_bucket_initialize: None,
_glue: boto3.client,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
table = test_catalog.create_table((database_name, table_name), table_schema_nested, properties={"format-version": "1"})
assert table.format_version == 1
table_info = _glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
storage_descriptor = table_info["Table"]["StorageDescriptor"]
columns = storage_descriptor["Columns"]
assert len(columns) == len(table_schema_nested.fields)
assert columns[0] == {
"Name": "foo",
"Type": "string",
"Parameters": {"iceberg.field.id": "1", "iceberg.field.optional": "true", "iceberg.field.current": "true"},
}
assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
@mock_aws
def test_create_table_with_default_warehouse(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_aws
def test_create_table_with_given_location(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(
identifier=identifier, schema=table_schema_nested, location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_aws
def test_create_table_removes_trailing_slash_in_location(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
location = f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, location=f"{location}/")
assert table.name() == identifier
assert table.location() == location
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_aws
def test_create_table_with_pyarrow_schema(
_bucket_initialize: None,
moto_endpoint_url: str,
pyarrow_schema_simple_without_ids: pa.Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(
identifier=identifier,
schema=pyarrow_schema_simple_without_ids,
location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}",
)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_aws
def test_create_table_with_no_location(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
with pytest.raises(ValueError):
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
@mock_aws
def test_create_table_with_strips(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db/"})
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_aws
def test_create_table_with_strips_bucket_root(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
table_strip = test_catalog.create_table(identifier, table_schema_nested)
assert table_strip.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location)
assert test_catalog._parse_metadata_version(table_strip.metadata_location) == 0
@mock_aws
def test_create_table_with_no_database(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
with pytest.raises(NoSuchNamespaceError):
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
@mock_aws
def test_create_table_with_glue_catalog_id(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
catalog_id = "444444444444"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(
catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}", "glue.id": catalog_id}
)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
glue = boto3.client("glue")
databases = glue.get_databases()
assert databases["DatabaseList"][0]["CatalogId"] == catalog_id
@mock_aws
def test_create_duplicated_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table(identifier, table_schema_nested)
@mock_aws
def test_load_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_aws
def test_load_table_from_self_identifier(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
intermediate = test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(intermediate.name())
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
@mock_aws
def test_load_non_exist_table(_bucket_initialize: None, moto_endpoint_url: str, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@mock_aws
def test_drop_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.drop_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@mock_aws
def test_drop_table_from_self_identifier(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.drop_table(table.name())
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(table.name())
@mock_aws
def test_drop_non_exist_table(_bucket_initialize: None, moto_endpoint_url: str, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
with pytest.raises(NoSuchTableError):
test_catalog.drop_table(identifier)
@mock_aws
def test_rename_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (database_name, new_table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
# the metadata_location should not change
assert new_table.metadata_location == table.metadata_location
# old table should be dropped
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@mock_aws
def test_rename_table_from_self_identifier(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (database_name, new_table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
test_catalog.rename_table(table.name(), new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
# the metadata_location should not change
assert new_table.metadata_location == table.metadata_location
# old table should be dropped
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(table.name())
@mock_aws
def test_rename_table_no_params(
_glue: boto3.client, _bucket_initialize: None, moto_endpoint_url: str, database_name: str, table_name: str
) -> None:
new_database_name = f"{database_name}_new"
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (new_database_name, new_table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_namespace(namespace=new_database_name)
_glue.create_table(
DatabaseName=database_name,
TableInput={"Name": table_name, "TableType": "EXTERNAL_TABLE", "Parameters": {"table_type": "iceberg"}},
)
with pytest.raises(NoSuchPropertyException):
test_catalog.rename_table(identifier, new_identifier)
@mock_aws
def test_rename_non_iceberg_table(
_glue: boto3.client, _bucket_initialize: None, moto_endpoint_url: str, database_name: str, table_name: str
) -> None:
new_database_name = f"{database_name}_new"
new_table_name = f"{table_name}_new"
identifier = (database_name, table_name)
new_identifier = (new_database_name, new_table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_namespace(namespace=new_database_name)
_glue.create_table(
DatabaseName=database_name,
TableInput={
"Name": table_name,
"TableType": "EXTERNAL_TABLE",
"Parameters": {"table_type": "noniceberg", "metadata_location": "test"},
},
)
with pytest.raises(NoSuchIcebergTableError):
test_catalog.rename_table(identifier, new_identifier)
@mock_aws
def test_list_tables(
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_list: List[str],
) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
non_iceberg_table_name = "non_iceberg_table"
non_table_type_table_name = "non_table_type_table"
glue_client = boto3.client("glue", endpoint_url=moto_endpoint_url)
glue_client.create_table(
DatabaseName=database_name,
TableInput={
"Name": non_iceberg_table_name,
"TableType": "EXTERNAL_TABLE",
"Parameters": {"table_type": "noniceberg"},
},
)
glue_client.create_table(
DatabaseName=database_name,
TableInput={
"Name": non_table_type_table_name,
"TableType": "OTHER_TABLE_TYPE",
"Parameters": {},
},
)
for table_name in table_list:
test_catalog.create_table((database_name, table_name), table_schema_nested)
loaded_table_list = test_catalog.list_tables(database_name)
assert (database_name, non_iceberg_table_name) not in loaded_table_list
assert (database_name, non_table_type_table_name) not in loaded_table_list
for table_name in table_list:
assert (database_name, table_name) in loaded_table_list
@mock_aws
def test_list_namespaces(_bucket_initialize: None, moto_endpoint_url: str, database_list: List[str]) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
for database_name in database_list:
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
for database_name in database_list:
assert (database_name,) in loaded_database_list
@mock_aws
def test_create_namespace_no_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties == {}
@mock_aws
def test_create_namespace_with_comment_and_location(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_location = f"s3://{BUCKET_NAME}/{database_name}.db"
test_properties = {
"comment": "this is a test description",
"location": test_location,
}
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name, properties=test_properties)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties["comment"] == "this is a test description"
assert properties["location"] == test_location
@mock_aws
def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
with pytest.raises(NamespaceAlreadyExistsError):
test_catalog.create_namespace(namespace=database_name, properties={"test": "test"})
@mock_aws
def test_drop_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 1
assert (database_name,) in loaded_database_list
test_catalog.drop_namespace(database_name)
loaded_database_list = test_catalog.list_namespaces()
assert len(loaded_database_list) == 0
@mock_aws
def test_drop_non_empty_namespace(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
assert len(test_catalog.list_tables(database_name)) == 1
with pytest.raises(NamespaceNotEmptyError):
test_catalog.drop_namespace(database_name)
@mock_aws
def test_drop_namespace_that_contains_non_iceberg_tables(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.glue.create_table(DatabaseName=database_name, TableInput={"Name": "hive_table"})
with pytest.raises(NamespaceNotEmptyError):
test_catalog.drop_namespace(database_name)
@mock_aws
def test_drop_non_exist_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
with pytest.raises(NoSuchNamespaceError):
test_catalog.drop_namespace(database_name)
@mock_aws
def test_load_namespace_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_location = f"s3://{BUCKET_NAME}/{database_name}.db"
test_properties = {
"comment": "this is a test description",
"location": test_location,
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(database_name, test_properties)
listed_properties = test_catalog.load_namespace_properties(database_name)
for k, v in listed_properties.items():
assert k in test_properties
assert v == test_properties[k]
@mock_aws
def test_load_non_exist_namespace_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
with pytest.raises(NoSuchNamespaceError):
test_catalog.load_namespace_properties(database_name)
@mock_aws
def test_update_namespace_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_properties = {
"comment": "this is a test description",
"location": f"s3://{BUCKET_NAME}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(database_name, test_properties)
update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
if k == "should_not_removed":
assert k in update_report.missing
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
test_catalog.drop_namespace(database_name)
@mock_aws
def test_load_empty_namespace_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(database_name)
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
@mock_aws
def test_load_default_namespace_properties(_glue, _bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: # type: ignore
# simulate creating database with default settings through AWS Glue Web Console
_glue.create_database(DatabaseInput={"Name": database_name})
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
listed_properties = test_catalog.load_namespace_properties(database_name)
assert listed_properties == {}
@mock_aws
def test_update_namespace_properties_overlap_update_removal(
_bucket_initialize: None, moto_endpoint_url: str, database_name: str
) -> None:
test_properties = {
"comment": "this is a test description",
"location": f"s3://{BUCKET_NAME}/{database_name}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property1": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(database_name, test_properties)
with pytest.raises(ValueError):
test_catalog.update_namespace_properties(database_name, removals, updates)
# should not modify the properties
assert test_catalog.load_namespace_properties(database_name) == test_properties
@mock_aws
def test_passing_glue_session_properties() -> None:
session_properties: Properties = {
"glue.access-key-id": "glue.access-key-id",
"glue.secret-access-key": "glue.secret-access-key",
"glue.profile-name": "glue.profile-name",
"glue.region": "glue.region",
"glue.session-token": "glue.session-token",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("boto3.Session") as mock_session:
test_catalog = GlueCatalog("glue", **session_properties)
mock_session.assert_called_with(
aws_access_key_id="glue.access-key-id",
aws_secret_access_key="glue.secret-access-key",
aws_session_token="glue.session-token",
region_name="glue.region",
profile_name="glue.profile-name",
botocore_session=None,
)
assert test_catalog.glue is mock_session().client()
@mock_aws
def test_passing_unified_session_properties_to_glue() -> None:
session_properties: Properties = {
"glue.profile-name": "glue.profile-name",
**UNIFIED_AWS_SESSION_PROPERTIES,
}
with mock.patch("boto3.Session") as mock_session:
test_catalog = GlueCatalog("glue", **session_properties)
mock_session.assert_called_with(
aws_access_key_id="client.access-key-id",
aws_secret_access_key="client.secret-access-key",
aws_session_token="client.session-token",
region_name="client.region",
profile_name="glue.profile-name",
botocore_session=None,
)
assert test_catalog.glue is mock_session().client()
@mock_aws
def test_commit_table_update_schema(
_glue: boto3.client,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata
original_table_metadata_location = table.metadata_location
original_table_last_updated_ms = table.metadata.last_updated_ms
assert TABLE_METADATA_LOCATION_REGEX.match(original_table_metadata_location)
assert test_catalog._parse_metadata_version(original_table_metadata_location) == 0
assert original_table_metadata.current_schema_id == 0
assert len(original_table_metadata.metadata_log) == 0
transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
assert len(updated_table_metadata.metadata_log) == 1
assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location
assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms
# Ensure schema is also pushed to Glue
table_info = _glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
storage_descriptor = table_info["Table"]["StorageDescriptor"]
columns = storage_descriptor["Columns"]
assert len(columns) == len(table_schema_nested.fields) + 1
assert columns[-1] == {
"Name": "b",
"Type": "int",
"Parameters": {"iceberg.field.id": "18", "iceberg.field.optional": "true", "iceberg.field.current": "true"},
}
assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
@mock_aws
def test_commit_table_properties(
_glue: boto3.client,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"Description": "test_description", "test_a": "test_aa", "test_c": "test_c"}
table_info = _glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
assert table_info["Table"]["Description"] == "test_description"
assert table_info["Table"]["Parameters"]["test_a"] == "test_aa"
assert table_info["Table"]["Parameters"]["test_c"] == "test_c"
@mock_aws
def test_commit_append_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)
updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert summary is not None
assert summary["snapshot_prop_a"] == "test_prop_a"
@mock_aws
def test_commit_overwrite_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
table.overwrite(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 2, "baz": True}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_b": "test_prop_b"},
)
updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 2
assert summary is not None
assert summary["snapshot_prop_a"] is None
assert summary["snapshot_prop_b"] == "test_prop_b"
assert len(updated_table_metadata.metadata_log) == 2
@mock_aws
@pytest.mark.parametrize("format_version", [1, 2])
def test_create_table_transaction(
_glue: boto3.client,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
format_version: int,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
with test_catalog.create_table_transaction(
identifier,
table_schema_nested,
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")),
properties={"format-version": format_version},
) as txn:
last_updated_metadata = txn.table_metadata.last_updated_ms
with txn.update_schema() as update_schema:
update_schema.add_column(path="b", field_type=IntegerType())
with txn.update_spec() as update_spec:
update_spec.add_identity("bar")
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
table = test_catalog.load_table(identifier)
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert table.format_version == format_version
assert table.schema().find_field("b").field_type == IntegerType()
assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"}
assert table.spec().last_assigned_field_id == 1001
assert table.spec().fields_by_source_id(1)[0].name == "foo"
assert table.spec().fields_by_source_id(1)[0].field_id == 1000
assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform()
assert table.spec().fields_by_source_id(2)[0].name == "bar"
assert table.spec().fields_by_source_id(2)[0].field_id == 1001
assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform()
assert table.metadata.last_updated_ms > last_updated_metadata
@mock_aws
def test_table_exists(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier=identifier, schema=table_schema_simple)
# Act and Assert for an existing table
assert test_catalog.table_exists(identifier) is True
# Act and Assert for a non-existing table
assert test_catalog.table_exists(("non", "exist")) is False
@mock_aws
def test_register_table_with_given_location(
_bucket_initialize: None, moto_endpoint_url: str, metadata_location: str, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
location = metadata_location
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
table = test_catalog.register_table(identifier, location)
assert table.name() == identifier
assert test_catalog.table_exists(identifier) is True
@mock_aws
def test_glue_endpoint_override(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
catalog_name = "glue"
test_endpoint = "https://test-endpoint"
test_catalog = GlueCatalog(
catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}", "glue.endpoint": test_endpoint}
)
assert test_catalog.glue.meta.endpoint_url == test_endpoint
@mock_aws
def test_glue_client_override() -> None:
catalog_name = "glue"
test_client = boto3.client("glue", region_name="us-west-2")
test_catalog = GlueCatalog(catalog_name, test_client)
assert test_catalog.glue is test_client