blob: 590c90191476196a7d5e4ea252f8007be5e3281f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import uuid
from collections.abc import Generator
from pathlib import Path, PosixPath
import pytest
from pytest_lazy_fixtures import lf
from pyiceberg.catalog import Catalog, MetastoreCatalog, load_catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import (
CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
ValidationError,
)
from pyiceberg.io import WAREHOUSE
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema
from pyiceberg.table.metadata import INITIAL_SPEC_ID
from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder
from pyiceberg.transforms import BucketTransform, DayTransform, IdentityTransform
from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType, UUIDType
from tests.conftest import (
clean_up,
does_support_atomic_concurrent_updates,
does_support_dot_in_identifier,
does_support_nested_namespaces,
does_support_purge_table,
does_support_schema_evolution,
does_support_slash_in_identifier,
)
@pytest.fixture(scope="function")
def memory_catalog(tmp_path: PosixPath) -> Generator[Catalog, None, None]:
test_catalog = InMemoryCatalog(
"test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}
)
yield test_catalog
clean_up(test_catalog)
@pytest.fixture(scope="function")
def sqlite_catalog_memory(warehouse: Path) -> Generator[Catalog, None, None]:
test_catalog = SqlCatalog("sqlitememory", uri="sqlite:///:memory:", warehouse=f"file://{warehouse}")
yield test_catalog
clean_up(test_catalog)
@pytest.fixture(scope="function")
def sqlite_catalog_file(warehouse: Path) -> Generator[Catalog, None, None]:
test_catalog = SqlCatalog("sqlitefile", uri=f"sqlite:////{warehouse}/sql-catalog.db", warehouse=f"file://{warehouse}")
yield test_catalog
clean_up(test_catalog)
@pytest.fixture(scope="function")
def rest_catalog() -> Generator[Catalog, None, None]:
test_catalog = RestCatalog("rest", uri="http://localhost:8181")
yield test_catalog
clean_up(test_catalog)
@pytest.fixture(scope="function")
def rest_test_catalog() -> Generator[Catalog, None, None]:
if test_catalog_name := os.environ.get("PYICEBERG_TEST_CATALOG"):
test_catalog = load_catalog(test_catalog_name)
yield test_catalog
clean_up(test_catalog)
else:
pytest.skip("PYICEBERG_TEST_CATALOG environment variables not set")
@pytest.fixture(scope="function")
def hive_catalog() -> Generator[Catalog, None, None]:
test_catalog = HiveCatalog(
"test_hive_catalog",
**{
"uri": "thrift://localhost:9083",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)
yield test_catalog
clean_up(test_catalog)
CATALOGS = [
lf("memory_catalog"),
lf("sqlite_catalog_memory"),
lf("sqlite_catalog_file"),
lf("rest_catalog"),
lf("hive_catalog"),
lf("rest_test_catalog"),
]
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_with_default_location(
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None:
identifier = ("invalid", table_name)
with pytest.raises(NoSuchNamespaceError):
test_catalog.create_table(identifier, table_schema_nested)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table((database_name, table_name), table_schema_nested)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_if_not_exists_duplicated_table(
test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
test_catalog.create_namespace(database_name)
table1 = test_catalog.create_table((database_name, table_name), table_schema_nested)
table2 = test_catalog.create_table_if_not_exists((database_name, table_name), table_schema_nested)
assert table1.name() == table2.name()
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
loaded_table = test_catalog.load_table(identifier)
assert table.name() == loaded_table.name()
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_list: list[str]) -> None:
test_catalog.create_namespace(database_name)
for table_name in table_list:
test_catalog.create_table((database_name, table_name), table_schema_nested)
identifier_list = test_catalog.list_tables(database_name)
assert len(identifier_list) == len(table_list)
for table_name in table_list:
assert (database_name, table_name) in identifier_list
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_rename_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
new_database_name = f"{database_name}_new"
test_catalog.create_namespace(database_name)
test_catalog.create_namespace(new_database_name)
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
new_table_name = f"rename-{table_name}"
new_identifier = (new_database_name, new_table_name)
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.name() == new_identifier
assert new_table.metadata_location == table.metadata_location
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_rename_table_already_exists(
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
new_database_name = f"{database_name}_new"
test_catalog.create_namespace(database_name)
test_catalog.create_namespace(new_database_name)
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
new_table_name = f"rename-{table_name}"
new_identifier = (new_database_name, new_table_name)
new_table = test_catalog.create_table(new_identifier, table_schema_nested)
assert new_table.name() == new_identifier
with pytest.raises(TableAlreadyExistsError):
test_catalog.rename_table(identifier, new_identifier)
assert test_catalog.table_exists(identifier)
assert test_catalog.table_exists(new_identifier)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_drop_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
test_catalog.drop_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_purge_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
if not does_support_purge_table(test_catalog):
pytest.skip("Catalog does not support purge_table operation")
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
test_catalog.create_table(identifier, table_schema_nested)
table = test_catalog.load_table(identifier)
assert table.name() == identifier
test_catalog.purge_table(identifier)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
assert test_catalog.table_exists((database_name, table_name)) is True
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema)
assert test_catalog.table_exists(identifier)
expected_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False),
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
NestedField(3, "new_col", IntegerType(), False),
)
expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "new_col"))
with table.transaction() as transaction:
with transaction.update_schema() as update_schema:
update_schema.add_column("new_col", IntegerType())
with transaction.update_spec() as update_spec:
update_spec.add_field("new_col", IdentityTransform())
table = test_catalog.load_table(identifier)
assert table.schema().as_struct() == expected_schema.as_struct()
assert table.spec().fields == expected_spec.fields
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
if not does_support_atomic_concurrent_updates(test_catalog):
pytest.skip("Catalog does not support atomic concurrent updates")
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema)
assert test_catalog.table_exists(identifier)
original_update = table.update_schema().add_column("new_col", LongType())
# Update schema concurrently so that the original update fails
concurrent_update = test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
concurrent_update.commit()
expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False))
with pytest.raises(CommitFailedException):
original_update.commit()
table = test_catalog.load_table(identifier)
assert table.schema().as_struct() == expected_schema.as_struct()
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table_transaction = test_catalog.create_table_transaction(identifier, test_schema)
assert not test_catalog.table_exists(identifier)
table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
assert not test_catalog.table_exists(identifier)
table_transaction.commit_transaction()
assert test_catalog.table_exists(identifier)
table = test_catalog.load_table(identifier)
assert table.schema().find_type("new_col").is_primitive
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_transaction_multiple_schemas(
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table_transaction = test_catalog.create_table_transaction(
identifier=identifier,
schema=test_schema,
partition_spec=test_partition_spec,
sort_order=SortOrder(SortField(source_id=1)),
)
assert not test_catalog.table_exists(identifier)
table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
assert not test_catalog.table_exists(identifier)
table_transaction.update_schema().add_column("new_col_1", UUIDType()).commit()
assert not test_catalog.table_exists(identifier)
table_transaction.update_spec().add_field("new_col", IdentityTransform()).commit()
assert not test_catalog.table_exists(identifier)
# TODO: test replace sort order when available
expected_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False),
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
NestedField(3, "new_col", IntegerType(), False),
NestedField(4, "new_col_1", UUIDType(), False),
)
expected_spec = PartitionSpec(
PartitionField(1, 1000, IdentityTransform(), "VendorID"),
PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
PartitionField(3, 1002, IdentityTransform(), "new_col"),
)
table_transaction.commit_transaction()
assert test_catalog.table_exists(identifier)
table = test_catalog.load_table(identifier)
assert table.schema().as_struct() == expected_schema.as_struct()
assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2
assert table.spec().fields == expected_spec.fields
assert table.spec().spec_id == INITIAL_SPEC_ID + 1
assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table_transaction(identifier=identifier, schema=test_schema)
assert not test_catalog.table_exists(identifier)
test_catalog.create_table(identifier, test_schema)
with pytest.raises(CommitFailedException):
table.commit_transaction()
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:
assert not test_catalog.namespace_exists(database_name)
test_catalog.create_namespace(database_name)
assert test_catalog.namespace_exists(database_name)
assert (database_name,) in test_catalog.list_namespaces()
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_duplicate_namespace(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
with pytest.raises(NamespaceAlreadyExistsError):
test_catalog.create_namespace(database_name)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_namepsace_if_not_exists(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_namespace_if_not_exists(database_name)
assert (database_name,) in test_catalog.list_namespaces()
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_namespace_with_comment(test_catalog: Catalog, database_name: str) -> None:
test_properties = {
"comment": "this is a test description",
}
test_catalog.create_namespace(namespace=database_name, properties=test_properties)
loaded_database_list = test_catalog.list_namespaces()
assert (database_name,) in loaded_database_list
properties = test_catalog.load_namespace_properties(database_name)
assert properties["comment"] == "this is a test description"
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_list_namespaces(test_catalog: Catalog, database_list: list[str]) -> None:
for database_name in database_list:
test_catalog.create_namespace(database_name)
db_list = test_catalog.list_namespaces()
for database_name in database_list:
assert (database_name,) in db_list
assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
test_catalog.create_namespace(database_name)
assert (database_name,) in test_catalog.list_namespaces()
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(NamespaceNotEmptyError):
test_catalog.drop_namespace(database_name)
test_catalog.drop_table((database_name, table_name))
test_catalog.drop_namespace(database_name)
assert (database_name,) not in test_catalog.list_namespaces()
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_load_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
test_properties = {
"comment": "this is a test description",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
test_catalog.create_namespace(database_name, test_properties)
listed_properties = test_catalog.load_namespace_properties(database_name)
for k, v in test_properties.items():
assert v == listed_properties[k]
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_namespace_properties(test_catalog: Catalog, database_name: str) -> None:
test_properties = {
"comment": "this is a test description",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
test_catalog.create_namespace(database_name, test_properties)
update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
if k == "should_not_removed":
assert k in update_report.missing
else:
assert k in update_report.removed
assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_table_spec(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema)
with table.update_spec() as update:
update.add_field(source_column_name="VendorID", transform=BucketTransform(16), partition_field_name="shard")
loaded = test_catalog.load_table(identifier)
expected_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="shard"), spec_id=1
)
# The spec ID may not match, so check equality of the fields
assert loaded.spec() == expected_spec
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_table_spec_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="id_bucket"))
table = test_catalog.create_table(identifier, test_schema, partition_spec=spec)
update = table.update_spec()
update.add_field(source_column_name="tpep_pickup_datetime", transform=BucketTransform(16), partition_field_name="shard")
# update with conflict
conflict_table = test_catalog.load_table(identifier)
with conflict_table.update_spec() as conflict_update:
conflict_update.remove_field("id_bucket")
with pytest.raises(
CommitFailedException, match="Requirement failed: default spec id has changed|default partition spec changed"
):
update.commit()
loaded = test_catalog.load_table(identifier)
assert loaded.spec() == PartitionSpec(spec_id=1)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_table_spec_then_revert(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
initial_spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="id_bucket"))
table = test_catalog.create_table(identifier, test_schema, partition_spec=initial_spec, properties={"format-version": "2"})
assert table.format_version == 2
with table.update_spec() as update:
update.add_identity(source_column_name="tpep_pickup_datetime")
with table.update_spec() as update:
update.remove_field("tpep_pickup_datetime")
assert table.spec() == initial_spec
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_register_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace_if_not_exists(database_name)
table = test_catalog.create_table(
identifier=identifier,
schema=table_schema_nested,
)
assert test_catalog.table_exists(identifier)
test_catalog.drop_table(identifier)
assert not test_catalog.table_exists(identifier)
test_catalog.register_table((database_name, "register_table"), metadata_location=table.metadata_location)
assert test_catalog.table_exists((database_name, "register_table"))
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_register_table_existing(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace_if_not_exists(database_name)
table = test_catalog.create_table(
identifier=identifier,
schema=table_schema_nested,
)
assert test_catalog.table_exists(identifier)
# Assert that registering the table again raises TableAlreadyExistsError
with pytest.raises(TableAlreadyExistsError):
test_catalog.register_table(identifier, metadata_location=table.metadata_location)
@pytest.mark.integration
def test_rest_custom_namespace_separator(rest_catalog: RestCatalog, table_schema_simple: Schema) -> None:
"""
Tests that the REST catalog correctly picks up the namespace-separator from the config endpoint.
The REST Catalog is configured with a '.' namespace separator.
"""
assert rest_catalog._namespace_separator == "."
unique_id = uuid.uuid4().hex
parent_namespace = (f"test_parent_{unique_id}",)
child_namespace_part = "child"
full_namespace_tuple = (*parent_namespace, child_namespace_part)
table_name = "my_table"
full_table_identifier_tuple = (*full_namespace_tuple, table_name)
rest_catalog.create_namespace(namespace=parent_namespace)
rest_catalog.create_namespace(namespace=full_namespace_tuple)
namespaces = rest_catalog.list_namespaces(parent_namespace)
assert full_namespace_tuple in namespaces
# Test with a table
table = rest_catalog.create_table(identifier=full_table_identifier_tuple, schema=table_schema_simple)
assert table.name() == full_table_identifier_tuple
tables = rest_catalog.list_tables(full_namespace_tuple)
assert full_table_identifier_tuple in tables
loaded_table = rest_catalog.load_table(identifier=full_table_identifier_tuple)
assert loaded_table.name() == full_table_identifier_tuple
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_incompatible_partitioned_schema_evolution(
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str
) -> None:
if not does_support_schema_evolution(test_catalog):
pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution")
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec)
assert test_catalog.table_exists(identifier)
with pytest.raises(ValidationError):
with table.update_schema() as update:
update.delete_column("VendorID")
# Assert column was not dropped
assert "VendorID" in table.schema().column_names
with table.transaction() as transaction:
with transaction.update_spec() as spec_update:
spec_update.remove_field("VendorID")
with transaction.update_schema() as schema_update:
schema_update.delete_column("VendorID")
assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1)
assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False))
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_namespace_with_slash(test_catalog: Catalog) -> None:
if not does_support_slash_in_identifier(test_catalog):
pytest.skip(f"{type(test_catalog).__name__} does not support slash in namespace")
namespace = ("new/db",)
if test_catalog.namespace_exists(namespace):
test_catalog.drop_namespace(namespace)
assert not test_catalog.namespace_exists(namespace)
test_catalog.create_namespace(namespace)
assert test_catalog.namespace_exists(namespace)
properties = test_catalog.load_namespace_properties(namespace)
assert properties is not None
test_catalog.drop_namespace(namespace)
assert not test_catalog.namespace_exists(namespace)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_incompatible_sorted_schema_evolution(
test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str
) -> None:
if not does_support_schema_evolution(test_catalog):
pytest.skip(f"{type(test_catalog).__name__} does not support schema evolution")
identifier = (database_name, table_name)
test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order)
assert test_catalog.table_exists(identifier)
with pytest.raises(ValidationError):
with table.update_schema() as update:
update.delete_column("VendorID")
assert table.schema() == Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False)
)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_namespace_with_dot(test_catalog: Catalog) -> None:
if not does_support_dot_in_identifier(test_catalog):
pytest.skip(f"{type(test_catalog).__name__} does not support dot in namespace")
namespace = ("new.db",)
if test_catalog.namespace_exists(namespace):
test_catalog.drop_namespace(namespace)
assert not test_catalog.namespace_exists(namespace)
test_catalog.create_namespace(namespace)
assert test_catalog.namespace_exists(namespace)
# Hierarchical catalogs might treat this as multiple levels.
if does_support_nested_namespaces(test_catalog):
namespaces = test_catalog.list_namespaces()
assert ("new",) in namespaces or ("new.db",) in namespaces
else:
assert namespace in test_catalog.list_namespaces()
properties = test_catalog.load_namespace_properties(namespace)
assert properties is not None
test_catalog.drop_namespace(namespace)
assert not test_catalog.namespace_exists(namespace)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_table_name_with_slash(test_catalog: Catalog, table_schema_simple: Schema) -> None:
if not does_support_slash_in_identifier(test_catalog):
pytest.skip(f"{type(test_catalog).__name__} does not support slash in table name")
namespace = ("ns_slash",)
table_ident = ("ns_slash", "tab/le")
if not test_catalog.namespace_exists(namespace):
test_catalog.create_namespace(namespace)
if test_catalog.table_exists(table_ident):
test_catalog.drop_table(table_ident)
assert not test_catalog.table_exists(table_ident)
test_catalog.create_table(table_ident, table_schema_simple)
assert test_catalog.table_exists(table_ident)
table = test_catalog.load_table(table_ident)
assert table.schema().as_struct() == table_schema_simple.as_struct()
test_catalog.drop_table(table_ident)
assert not test_catalog.table_exists(table_ident)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_table_name_with_dot(test_catalog: Catalog, table_schema_simple: Schema) -> None:
if not does_support_dot_in_identifier(test_catalog):
pytest.skip(f"{type(test_catalog).__name__} does not support dot in table name")
namespace = ("ns_dot",)
table_ident = ("ns_dot", "ta.ble")
if not test_catalog.namespace_exists(namespace):
test_catalog.create_namespace(namespace)
if test_catalog.table_exists(table_ident):
test_catalog.drop_table(table_ident)
assert not test_catalog.table_exists(table_ident)
test_catalog.create_table(table_ident, table_schema_simple)
assert test_catalog.table_exists(table_ident)
assert table_ident in test_catalog.list_tables(namespace)
table = test_catalog.load_table(table_ident)
assert table.schema().as_struct() == table_schema_simple.as_struct()
test_catalog.drop_table(table_ident)
assert not test_catalog.table_exists(table_ident)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_drop_missing_table(test_catalog: Catalog, database_name: str) -> None:
test_catalog.create_namespace_if_not_exists(database_name)
table_ident = (database_name, "missing_table")
assert not test_catalog.table_exists(table_ident)
with pytest.raises(NoSuchTableError):
test_catalog.drop_table(table_ident)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_drop_nonexistent_namespace(test_catalog: Catalog) -> None:
namespace = ("non_existent_namespace",)
with pytest.raises(NoSuchNamespaceError):
test_catalog.drop_namespace(namespace)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_rename_table_missing_source_table(test_catalog: Catalog, table_name: str, database_name: str) -> None:
test_catalog.create_namespace_if_not_exists(database_name)
identifier = (database_name, table_name)
new_identifier = (database_name, f"rename-{table_name}")
with pytest.raises(NoSuchTableError):
test_catalog.rename_table(identifier, new_identifier)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_rename_table_destination_namespace_missing(
test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str
) -> None:
test_catalog.create_namespace_if_not_exists(database_name)
identifier = (database_name, table_name)
test_catalog.create_table(identifier, table_schema_nested)
new_database_name = "non_existent_namespace"
new_identifier = (new_database_name, table_name)
with pytest.raises(NoSuchNamespaceError):
test_catalog.rename_table(identifier, new_identifier)
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_load_missing_table(test_catalog: Catalog, database_name: str, table_name: str) -> None:
test_catalog.create_namespace_if_not_exists(database_name)
identifier = (database_name, table_name)
with pytest.raises(NoSuchTableError):
test_catalog.load_table(identifier)