blob: 00868a57392e47d9296c2f2aefb4453027fdc5d1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from pathlib import Path
from typing import Any, Generator, cast
import pyarrow as pa
import pytest
from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture
from sqlalchemy import Engine, create_engine, inspect
from sqlalchemy.exc import ArgumentError, IntegrityError
from pyiceberg.catalog import (
Catalog,
load_catalog,
)
from pyiceberg.catalog.sql import (
DEFAULT_ECHO_VALUE,
DEFAULT_POOL_PRE_PING_VALUE,
IcebergTables,
SqlCatalog,
SqlCatalogBaseTable,
)
from pyiceberg.exceptions import (
CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
SortDirection,
SortField,
SortOrder,
)
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import Identifier
from pyiceberg.types import IntegerType, NestedField, StringType, strtobool
CATALOG_TABLES = [c.__tablename__ for c in SqlCatalogBaseTable.__subclasses__()]
@pytest.fixture(scope="module")
def catalog_name() -> str:
return "test_sql_catalog"
@pytest.fixture(name="random_table_identifier")
def fixture_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier:
os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True)
return database_name, table_name
@pytest.fixture(name="another_random_table_identifier")
def fixture_another_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier:
database_name = database_name + "_new"
table_name = table_name + "_new"
os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True)
return database_name, table_name
@pytest.fixture(name="random_hierarchical_identifier")
def fixture_random_hierarchical_identifier(warehouse: Path, hierarchical_namespace_name: str, table_name: str) -> Identifier:
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True)
return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name)))
@pytest.fixture(name="another_random_hierarchical_identifier")
def fixture_another_random_hierarchical_identifier(
warehouse: Path, hierarchical_namespace_name: str, table_name: str
) -> Identifier:
hierarchical_namespace_name = hierarchical_namespace_name + "_new"
table_name = table_name + "_new"
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True)
return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name)))
@pytest.fixture(scope="module")
def catalog_memory(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": "sqlite:///:memory:",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog(catalog_name, **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()
@pytest.fixture(scope="module")
def catalog_sqlite(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog(catalog_name, **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()
@pytest.fixture(scope="module")
def catalog_uri(warehouse: Path) -> str:
return f"sqlite:////{warehouse}/sql-catalog"
@pytest.fixture(scope="module")
def alchemy_engine(catalog_uri: str) -> Engine:
return create_engine(catalog_uri)
@pytest.fixture(scope="module")
def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog(catalog_name, **props)
catalog.engine.dialect.supports_sane_rowcount = False
catalog.create_tables()
yield catalog
catalog.destroy_tables()
@pytest.fixture(scope="module")
def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog",
"warehouse": f"file://{warehouse}",
PY_IO_IMPL: FSSPEC_FILE_IO,
}
catalog = SqlCatalog(catalog_name, **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()
def test_creation_with_no_uri(catalog_name: str) -> None:
with pytest.raises(NoSuchPropertyException):
SqlCatalog(catalog_name, not_uri="unused")
def test_creation_with_unsupported_uri(catalog_name: str) -> None:
with pytest.raises(ArgumentError):
SqlCatalog(catalog_name, uri="unsupported:xxx")
def test_creation_with_echo_parameter(catalog_name: str, warehouse: Path) -> None:
# echo_param, expected_echo_value
test_cases = [(None, strtobool(DEFAULT_ECHO_VALUE)), ("debug", "debug"), ("true", True), ("false", False)]
for echo_param, expected_echo_value in test_cases:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog",
"warehouse": f"file://{warehouse}",
}
# None is for default value
if echo_param is not None:
props["echo"] = echo_param
catalog = SqlCatalog(catalog_name, **props)
assert catalog.engine._echo == expected_echo_value, (
f"Assertion failed: expected echo value {expected_echo_value}, "
f"but got {catalog.engine._echo}. For echo_param={echo_param}"
)
def test_creation_with_pool_pre_ping_parameter(catalog_name: str, warehouse: Path) -> None:
# pool_pre_ping_param, expected_pool_pre_ping_value
test_cases = [
(None, strtobool(DEFAULT_POOL_PRE_PING_VALUE)),
("true", True),
("false", False),
]
for pool_pre_ping_param, expected_pool_pre_ping_value in test_cases:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog",
"warehouse": f"file://{warehouse}",
}
# None is for default value
if pool_pre_ping_param is not None:
props["pool_pre_ping"] = pool_pre_ping_param
catalog = SqlCatalog(catalog_name, **props)
assert catalog.engine.pool._pre_ping == expected_pool_pre_ping_value, (
f"Assertion failed: expected pool_pre_ping value {expected_pool_pre_ping_value}, "
f"but got {catalog.engine.pool._pre_ping}. For pool_pre_ping_param={pool_pre_ping_param}"
)
def test_creation_from_impl(catalog_name: str, warehouse: Path) -> None:
assert isinstance(
load_catalog(
catalog_name,
**{
"py-catalog-impl": "pyiceberg.catalog.sql.SqlCatalog",
"uri": f"sqlite:////{warehouse}/sql-catalog",
"warehouse": f"file://{warehouse}",
},
),
SqlCatalog,
)
def confirm_no_tables_exist(alchemy_engine: Engine) -> None:
inspector = inspect(alchemy_engine)
for c in SqlCatalogBaseTable.__subclasses__():
if inspector.has_table(c.__tablename__):
c.__table__.drop(alchemy_engine)
any_table_exists = any(t for t in inspector.get_table_names() if t in CATALOG_TABLES)
if any_table_exists:
pytest.raises(TableAlreadyExistsError, "Tables exist, but should not have been created yet")
def confirm_all_tables_exist(catalog: SqlCatalog) -> None:
all_tables_exists = True
for t in CATALOG_TABLES:
if t not in inspect(catalog.engine).get_table_names():
all_tables_exists = False
assert isinstance(catalog, SqlCatalog), "Catalog should be a SQLCatalog"
assert all_tables_exists, "Tables should have been created"
def load_catalog_for_catalog_table_creation(catalog_name: str, catalog_uri: str) -> SqlCatalog:
catalog = load_catalog(
catalog_name,
type="sql",
uri=catalog_uri,
init_catalog_tables="true",
)
return cast(SqlCatalog, catalog)
def test_creation_when_no_tables_exist(alchemy_engine: Engine, catalog_name: str, catalog_uri: str) -> None:
confirm_no_tables_exist(alchemy_engine)
catalog = load_catalog_for_catalog_table_creation(catalog_name=catalog_name, catalog_uri=catalog_uri)
confirm_all_tables_exist(catalog)
def test_creation_when_one_tables_exists(alchemy_engine: Engine, catalog_name: str, catalog_uri: str) -> None:
confirm_no_tables_exist(alchemy_engine)
# Create one table
inspector = inspect(alchemy_engine)
IcebergTables.__table__.create(bind=alchemy_engine)
assert IcebergTables.__tablename__ in [t for t in inspector.get_table_names() if t in CATALOG_TABLES]
catalog = load_catalog_for_catalog_table_creation(catalog_name=catalog_name, catalog_uri=catalog_uri)
confirm_all_tables_exist(catalog)
def test_creation_when_all_tables_exists(alchemy_engine: Engine, catalog_name: str, catalog_uri: str) -> None:
confirm_no_tables_exist(alchemy_engine)
# Create all tables
inspector = inspect(alchemy_engine)
SqlCatalogBaseTable.metadata.create_all(bind=alchemy_engine)
for c in CATALOG_TABLES:
assert c in [t for t in inspector.get_table_names() if t in CATALOG_TABLES]
catalog = load_catalog_for_catalog_table_creation(catalog_name=catalog_name, catalog_uri=catalog_uri)
confirm_all_tables_exist(catalog)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_create_tables_idempotency(catalog: SqlCatalog) -> None:
# Second initialization should not fail even if tables are already created
catalog.create_tables()
catalog.create_tables()
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
assert table.sort_order().order_id == 0, "Order ID must match"
assert table.sort_order().is_unsorted is True, "Order must be unsorted"
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested, properties={"format-version": "1"})
assert table.sort_order().order_id == 0, "Order ID must match"
assert table.sort_order().is_unsorted is True, "Order must be unsorted"
assert table.format_version == 1
assert table.spec() == UNPARTITIONED_PARTITION_SPEC
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_table_with_pyarrow_schema(
catalog: SqlCatalog,
pyarrow_schema_simple_without_ids: pa.Schema,
iceberg_table_schema_simple: Schema,
table_identifier: Identifier,
) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, pyarrow_schema_simple_without_ids)
assert table.schema() == iceberg_table_schema_simple
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier: Identifier) -> None:
import pyarrow as pa
pyarrow_table = pa.Table.from_arrays(
[
pa.array([None, "A", "B", "C"]), # 'foo' column
pa.array([1, 2, 3, 4]), # 'bar' column
pa.array([True, None, False, True]), # 'baz' column
pa.array([None, "A", "B", "C"]), # 'large' column
],
schema=pa.schema(
[
pa.field("foo", pa.large_string(), nullable=True),
pa.field("bar", pa.int32(), nullable=False),
pa.field("baz", pa.bool_(), nullable=True),
pa.field("large", pa.large_string(), nullable=True),
]
),
)
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, pyarrow_table.schema)
table.append(pyarrow_table)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_table_custom_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST))
table = catalog.create_table(table_identifier, table_schema_nested, sort_order=order)
given_sort_order = table.sort_order()
assert given_sort_order.order_id == 1, "Order ID must match"
assert len(given_sort_order.fields) == 1, "Order must have 1 field"
assert given_sort_order.fields[0].direction == SortDirection.ASC, "Direction must match"
assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, "Null order must match"
assert isinstance(given_sort_order.fields[0].transform, IdentityTransform), "Transform must match"
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_table_with_default_warehouse_location(
warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier
) -> None:
identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
catalog.create_table(table_identifier, table_schema_nested)
table = catalog.load_table(table_identifier)
assert table.name() == identifier_tuple
assert table.metadata_location.startswith(f"file://{warehouse}")
assert os.path.exists(table.metadata_location[len("file://") :])
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_table_with_given_location_removes_trailing_slash(
warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier
) -> None:
identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
namespace = Catalog.namespace_from(table_identifier)
table_name = Catalog.table_name_from(identifier_tuple)
location = f"file://{warehouse}/{catalog.name}/{table_name}-given"
catalog.create_namespace(namespace)
catalog.create_table(table_identifier, table_schema_nested, location=f"{location}/")
table = catalog.load_table(table_identifier)
assert table.name() == identifier_tuple
assert table.metadata_location.startswith(f"file://{warehouse}")
assert os.path.exists(table.metadata_location[len("file://") :])
assert table.location() == location
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
catalog.create_table(table_identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
catalog.create_table(table_identifier, table_schema_nested)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_create_table_if_not_exists_duplicated_table(
catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier
) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table1 = catalog.create_table(table_identifier, table_schema_nested)
table2 = catalog.create_table_if_not_exists(table_identifier, table_schema_nested)
assert table1.name() == table2.name()
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_create_table_with_non_existing_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_name: str) -> None:
identifier = ("invalid", table_name)
with pytest.raises(NoSuchNamespaceError):
catalog.create_table(identifier, table_schema_nested)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_create_table_without_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_name: str) -> None:
with pytest.raises(NoSuchNamespaceError):
catalog.create_table(table_name, table_schema_nested)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_register_table(catalog: SqlCatalog, table_identifier: Identifier, metadata_location: str) -> None:
identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.register_table(table_identifier, metadata_location)
assert table.name() == identifier_tuple
assert table.metadata_location == metadata_location
assert os.path.exists(metadata_location)
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_register_existing_table(catalog: SqlCatalog, table_identifier: Identifier, metadata_location: str) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
catalog.register_table(table_identifier, metadata_location)
with pytest.raises(TableAlreadyExistsError):
catalog.register_table(table_identifier, metadata_location)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_register_table_with_non_existing_namespace(catalog: SqlCatalog, metadata_location: str, table_name: str) -> None:
identifier = ("invalid", table_name)
with pytest.raises(NoSuchNamespaceError):
catalog.register_table(identifier, metadata_location)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_register_table_without_namespace(catalog: SqlCatalog, metadata_location: str, table_name: str) -> None:
with pytest.raises(ValueError):
catalog.register_table(table_name, metadata_location)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
loaded_table = catalog.load_table(table_identifier)
assert table.name() == loaded_table.name()
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_load_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
intermediate = catalog.load_table(table_identifier)
assert intermediate.name() == identifier_tuple
loaded_table = catalog.load_table(intermediate.name())
assert table.name() == loaded_table.name()
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
assert table.name() == identifier_tuple
catalog.drop_table(table_identifier)
with pytest.raises(NoSuchTableError):
catalog.load_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
identifier_tuple = Catalog.identifier_to_tuple(table_identifier)
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
assert table.name() == identifier_tuple
catalog.drop_table(table.name())
with pytest.raises(NoSuchTableError):
catalog.load_table(table.name())
with pytest.raises(NoSuchTableError):
catalog.load_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier: Identifier) -> None:
with pytest.raises(NoSuchTableError):
catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"from_table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
@pytest.mark.parametrize(
"to_table_identifier",
[
lazy_fixture("another_random_table_identifier"),
lazy_fixture("another_random_hierarchical_identifier"),
],
)
def test_rename_table(
catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier
) -> None:
from_namespace = Catalog.namespace_from(from_table_identifier)
to_namespace = Catalog.namespace_from(to_table_identifier)
catalog.create_namespace(from_namespace)
catalog.create_namespace(to_namespace)
table = catalog.create_table(from_table_identifier, table_schema_nested)
assert table.name() == from_table_identifier
catalog.rename_table(from_table_identifier, to_table_identifier)
new_table = catalog.load_table(to_table_identifier)
assert new_table.name() == to_table_identifier
assert new_table.metadata_location == table.metadata_location
with pytest.raises(NoSuchTableError):
catalog.load_table(from_table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"from_table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
@pytest.mark.parametrize(
"to_table_identifier",
[
lazy_fixture("another_random_table_identifier"),
lazy_fixture("another_random_hierarchical_identifier"),
],
)
def test_rename_table_from_self_identifier(
catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier
) -> None:
from_namespace = Catalog.namespace_from(from_table_identifier)
to_namespace = Catalog.namespace_from(to_table_identifier)
catalog.create_namespace(from_namespace)
catalog.create_namespace(to_namespace)
table = catalog.create_table(from_table_identifier, table_schema_nested)
assert table.name() == from_table_identifier
catalog.rename_table(table.name(), to_table_identifier)
new_table = catalog.load_table(to_table_identifier)
assert new_table.name() == to_table_identifier
assert new_table.metadata_location == table.metadata_location
with pytest.raises(NoSuchTableError):
catalog.load_table(table.name())
with pytest.raises(NoSuchTableError):
catalog.load_table(from_table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"from_table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
@pytest.mark.parametrize(
"to_table_identifier",
[
lazy_fixture("another_random_table_identifier"),
lazy_fixture("another_random_hierarchical_identifier"),
],
)
def test_rename_table_to_existing_one(
catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier
) -> None:
from_namespace = Catalog.namespace_from(from_table_identifier)
to_namespace = Catalog.namespace_from(to_table_identifier)
catalog.create_namespace(from_namespace)
catalog.create_namespace(to_namespace)
table = catalog.create_table(from_table_identifier, table_schema_nested)
assert table.name() == from_table_identifier
new_table = catalog.create_table(to_table_identifier, table_schema_nested)
assert new_table.name() == to_table_identifier
with pytest.raises(TableAlreadyExistsError):
catalog.rename_table(from_table_identifier, to_table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"from_table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
@pytest.mark.parametrize(
"to_table_identifier",
[
lazy_fixture("another_random_table_identifier"),
lazy_fixture("another_random_hierarchical_identifier"),
],
)
def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier: Identifier, to_table_identifier: Identifier) -> None:
to_namespace = Catalog.namespace_from(to_table_identifier)
catalog.create_namespace(to_namespace)
with pytest.raises(NoSuchTableError):
catalog.rename_table(from_table_identifier, to_table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"from_table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
@pytest.mark.parametrize(
"to_table_identifier",
[
lazy_fixture("another_random_table_identifier"),
lazy_fixture("another_random_hierarchical_identifier"),
],
)
def test_rename_table_to_missing_namespace(
catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier
) -> None:
from_namespace = Catalog.namespace_from(from_table_identifier)
catalog.create_namespace(from_namespace)
table = catalog.create_table(from_table_identifier, table_schema_nested)
assert table.name() == from_table_identifier
with pytest.raises(NoSuchNamespaceError):
catalog.rename_table(from_table_identifier, to_table_identifier)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier_1",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
@pytest.mark.parametrize(
"table_identifier_2",
[
lazy_fixture("another_random_table_identifier"),
lazy_fixture("another_random_hierarchical_identifier"),
],
)
def test_list_tables(
catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1: Identifier, table_identifier_2: Identifier
) -> None:
namespace_1 = Catalog.namespace_from(table_identifier_1)
namespace_2 = Catalog.namespace_from(table_identifier_2)
catalog.create_namespace(namespace_1)
catalog.create_namespace(namespace_2)
catalog.create_table(table_identifier_1, table_schema_nested)
catalog.create_table(table_identifier_2, table_schema_nested)
identifier_list = catalog.list_tables(namespace_1)
assert len(identifier_list) == 1
assert table_identifier_1 in identifier_list
identifier_list = catalog.list_tables(namespace_2)
assert len(identifier_list) == 1
assert table_identifier_2 in identifier_list
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace: str) -> None:
with pytest.raises(NoSuchNamespaceError):
catalog.list_tables(namespace)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_create_namespace_if_not_exists(catalog: SqlCatalog, database_name: str) -> None:
catalog.create_namespace(database_name)
assert (database_name,) in catalog.list_namespaces()
catalog.create_namespace_if_not_exists(database_name)
assert (database_name,) in catalog.list_namespaces()
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None:
catalog.create_namespace(namespace)
assert (Catalog.identifier_to_tuple(namespace)[:1]) in catalog.list_namespaces()
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) -> None:
catalog.create_namespace(namespace)
with pytest.raises(NamespaceAlreadyExistsError):
catalog.create_namespace(namespace)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace: str) -> None:
catalog.create_namespace(namespace + "_1")
# Second namespace is a prefix of the first one, make sure it can be added.
catalog.create_namespace(namespace)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, namespace: str) -> None:
test_location = "/test/location"
test_properties = {
"comment": "this is a test description",
"location": test_location,
}
catalog.create_namespace(namespace=namespace, properties=test_properties)
loaded_database_list = catalog.list_namespaces()
assert Catalog.identifier_to_tuple(namespace)[:1] in loaded_database_list
properties = catalog.load_namespace_properties(namespace)
assert properties["comment"] == "this is a test description"
assert properties["location"] == test_location
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
@pytest.mark.filterwarnings("ignore")
def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace: str) -> None:
with pytest.raises(IntegrityError):
catalog.create_namespace(namespace=namespace, properties={None: "value"}) # type: ignore
with pytest.raises(IntegrityError):
catalog.create_namespace(namespace=namespace, properties={"key": None})
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("empty_namespace", ["", (), (""), ("", ""), " ", (" ")])
def test_create_namespace_with_empty_identifier(catalog: SqlCatalog, empty_namespace: Any) -> None:
with pytest.raises(NoSuchNamespaceError):
catalog.create_namespace(empty_namespace)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_namespace_exists(catalog: SqlCatalog) -> None:
for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", "ns2")]:
catalog.create_namespace(ns)
assert catalog._namespace_exists(ns)
assert catalog._namespace_exists("db2") # `db2` exists because `db2.ns1` exists
assert catalog._namespace_exists("db3.ns1") # `db3.ns1` exists because `db3.ns1.ns2` exists
assert not catalog._namespace_exists("db_") # make sure '_' is escaped in the query
assert not catalog._namespace_exists("db%") # make sure '%' is escaped in the query
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_list_namespaces(catalog: SqlCatalog) -> None:
namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", "db2.ns1", "db%"]
for namespace in namespace_list:
if not catalog._namespace_exists(namespace):
catalog.create_namespace(namespace)
ns_list = catalog.list_namespaces()
for ns in [("db",), ("db%",), ("db2",)]:
assert ns in ns_list
ns_list = catalog.list_namespaces("db")
assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")]
ns_list = catalog.list_namespaces("db.ns1")
assert sorted(ns_list) == [("db", "ns1", "ns2")]
ns_list = catalog.list_namespaces("db.ns1.ns2")
assert len(ns_list) == 0
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_list_namespaces_fuzzy_match(catalog: SqlCatalog) -> None:
namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", "db_.ns1.ns2", "db2.ns1.ns2"]
for namespace in namespace_list:
if not catalog._namespace_exists(namespace):
catalog.create_namespace(namespace)
assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")]
assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")]
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_list_non_existing_namespaces(catalog: SqlCatalog) -> None:
with pytest.raises(NoSuchNamespaceError):
catalog.list_namespaces("does_not_exist")
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
assert catalog._namespace_exists(namespace)
catalog.create_table(table_identifier, table_schema_nested)
with pytest.raises(NamespaceNotEmptyError):
catalog.drop_namespace(namespace)
catalog.drop_table(table_identifier)
catalog.drop_namespace(namespace)
assert not catalog._namespace_exists(namespace)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_drop_non_existing_namespaces(catalog: SqlCatalog) -> None:
with pytest.raises(NoSuchNamespaceError):
catalog.drop_namespace("does_not_exist")
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) -> None:
warehouse_location = "/test/location"
test_properties = {
"comment": "this is a test description",
"location": f"{warehouse_location}/{namespace}",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
catalog.create_namespace(namespace, test_properties)
listed_properties = catalog.load_namespace_properties(namespace)
for k, v in listed_properties.items():
assert k in test_properties
assert v == test_properties[k]
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str) -> None:
catalog.create_namespace(namespace)
listed_properties = catalog.load_namespace_properties(namespace)
assert listed_properties == {"exists": "true"}
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
def test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog) -> None:
with pytest.raises(NoSuchNamespaceError):
catalog.load_namespace_properties("does_not_exist")
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")])
def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) -> None:
warehouse_location = "/test/location"
test_properties = {
"comment": "this is a test description",
"location": f"{warehouse_location}/{namespace}",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
catalog.create_namespace(namespace, test_properties)
update_report = catalog.update_namespace_properties(namespace, removals, updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
if k == "should_not_removed":
assert k in update_report.missing
else:
assert k in update_report.removed
assert catalog.load_namespace_properties(namespace) == {
"comment": "updated test description",
"test_property4": "4",
"test_property5": "5",
"location": f"{warehouse_location}/{namespace}",
}
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
last_updated_ms = table.metadata.last_updated_ms
original_table_metadata_location = table.metadata_location
original_table_last_updated_ms = table.metadata.last_updated_ms
assert catalog._parse_metadata_version(table.metadata_location) == 0
assert table.metadata.current_schema_id == 0
transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
assert updated_table_metadata.last_updated_ms > last_updated_ms
assert len(updated_table_metadata.metadata_log) == 1
assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location
assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
lazy_fixture("catalog_sqlite_fsspec"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_simple)
df = pa.Table.from_pydict(
{
"foo": ["a"],
"bar": [1],
"baz": [True],
},
schema=schema_to_pyarrow(table_schema_simple),
)
table.append(df)
# new snapshot is written in APPEND mode
assert len(table.metadata.snapshots) == 1
assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id
assert table.metadata.snapshots[0].parent_snapshot_id is None
assert table.metadata.snapshots[0].sequence_number == 1
assert table.metadata.snapshots[0].summary is not None
assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
assert table.metadata.snapshots[0].summary["added-data-files"] == "1"
assert table.metadata.snapshots[0].summary["added-records"] == "1"
assert table.metadata.snapshots[0].summary["total-data-files"] == "1"
assert table.metadata.snapshots[0].summary["total-records"] == "1"
assert len(table.metadata.metadata_log) == 1
# read back the data
assert df == table.scan().to_arrow()
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table_a = catalog.create_table(table_identifier, table_schema_simple)
table_b = catalog.load_table(table_identifier)
with table_a.update_schema() as update:
update.add_column(path="b", field_type=IntegerType())
with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 0, found 1"):
# This one should fail since it already has been updated
with table_b.update_schema() as update:
update.add_column(path="c", field_type=IntegerType())
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
identifier = f"default.arrow_write_data_and_evolve_schema_v{format_version}"
try:
catalog.create_namespace("default")
except NamespaceAlreadyExistsError:
pass
try:
catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
pa_table = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
},
schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
)
tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)})
pa_table_with_column = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
"bar": [19, None, 25],
},
schema=pa.schema(
[
pa.field("foo", pa.large_string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]
),
)
with tbl.transaction() as txn:
with txn.update_schema() as schema_txn:
schema_txn.union_by_name(pa_table_with_column.schema)
txn.append(pa_table_with_column)
txn.overwrite(pa_table_with_column)
txn.delete("foo = 'a'")
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> None:
identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
try:
catalog.create_namespace("default")
except NamespaceAlreadyExistsError:
pass
try:
catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass
pa_table = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
},
schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
)
pa_table_with_column = pa.Table.from_pydict(
{
"foo": ["a", None, "z"],
"bar": [19, None, 25],
},
schema=pa.schema(
[
pa.field("foo", pa.large_string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]
),
)
with catalog.create_table_transaction(
identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)}
) as txn:
with txn.update_snapshot().fast_append() as snapshot_update:
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, io=txn._table.io):
snapshot_update.append_data_file(data_file)
with txn.update_schema() as schema_txn:
schema_txn.union_by_name(pa_table_with_column.schema)
with txn.update_snapshot().fast_append() as snapshot_update:
for data_file in _dataframe_to_data_files(
table_metadata=txn.table_metadata, df=pa_table_with_column, io=txn._table.io
):
snapshot_update.append_data_file(data_file)
tbl = catalog.load_table(identifier=identifier)
assert tbl.format_version == format_version
assert len(tbl.scan().to_arrow()) == 6
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None:
# table properties can be set to int, but still serialized to string
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
property_with_int = {"property_name": 42}
table = catalog.create_table(table_identifier, table_schema_simple, properties=property_with_int)
assert isinstance(table.properties["property_name"], str)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_table_properties_raise_for_none_value(
catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier
) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
property_with_none = {"property_name": None}
with pytest.raises(ValidationError) as exc_info:
_ = catalog.create_table(table_identifier, table_schema_simple, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
catalog.create_table(table_identifier, table_schema_simple, properties={"format-version": "2"})
existing_table = table_identifier
# Act and Assert for an existing table
assert catalog.table_exists(existing_table) is True
# Act and Assert for a non-existing table
assert catalog.table_exists(("non", "exist")) is False
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
],
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
# To catch manifest file name collision bug during merge:
# https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918
catalog.create_namespace_if_not_exists("default")
try:
catalog.drop_table("default.test_merge_manifest")
except NoSuchTableError:
pass
tbl = catalog.create_table(
"default.test_merge_manifest",
arrow_table_with_null.schema,
properties={
"commit.manifest-merge.enabled": "true",
"commit.manifest.min-count-to-merge": "2",
"format-version": format_version,
},
)
for _ in range(5):
tbl.append(arrow_table_with_null)
assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)
current_snapshot = tbl.current_snapshot()
assert current_snapshot
manifests = current_snapshot.manifests(tbl.io)
assert len(manifests) == 1
@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, random_table_identifier: str) -> None:
namespace = Catalog.namespace_from(random_table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(random_table_identifier, table_schema_nested)
original_metadata_location = table.metadata_location
for i in range(5):
with table.transaction() as transaction:
with transaction.update_schema() as update:
update.add_column(path=f"new_column_{i}", field_type=IntegerType())
assert len(table.metadata.metadata_log) == 5
assert os.path.exists(original_metadata_location[len("file://") :])
# Set the max versions property to 2, and delete after commit
new_property = {
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
}
with table.transaction() as transaction:
transaction.set_properties(properties=new_property)
# Verify that only the most recent metadata files are kept
assert len(table.metadata.metadata_log) == 2
updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log
# new metadata log was added, so earlier metadata logs are removed.
with table.transaction() as transaction:
with transaction.update_schema() as update:
update.add_column(path="new_column_x", field_type=IntegerType())
assert len(table.metadata.metadata_log) == 2
assert not os.path.exists(original_metadata_location[len("file://") :])
assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :])
assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
class TestSqlCatalogClose:
"""Test SqlCatalog close functionality."""
def test_sql_catalog_close(self, catalog_sqlite: SqlCatalog) -> None:
"""Test that SqlCatalog close method properly disposes the engine."""
# Verify engine exists
assert hasattr(catalog_sqlite, "engine")
# Close the catalog
catalog_sqlite.close()
# Verify engine is disposed by checking that the engine still exists
assert hasattr(catalog_sqlite, "engine")
def test_sql_catalog_context_manager(self, warehouse: Path) -> None:
"""Test that SqlCatalog works as a context manager."""
with SqlCatalog("test", uri="sqlite:///:memory:", warehouse=str(warehouse)) as catalog:
# Verify engine exists
assert hasattr(catalog, "engine")
# Create a namespace and table to test functionality
catalog.create_namespace("test_db")
schema = Schema(NestedField(1, "name", StringType(), required=True))
catalog.create_table(("test_db", "test_table"), schema)
# Verify engine is disposed after exiting context
assert hasattr(catalog, "engine")
def test_sql_catalog_context_manager_with_exception(self) -> None:
"""Test that SqlCatalog context manager properly closes even with exceptions."""
catalog = None
try:
with SqlCatalog("test", uri="sqlite:///:memory:") as cat:
catalog = cat
# Verify engine exists
assert hasattr(catalog, "engine")
raise ValueError("Test exception")
except ValueError:
pass
# Verify engine is disposed even after exception
assert catalog is not None
assert hasattr(catalog, "engine")
def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> None:
"""Test that multiple close calls on SqlCatalog are safe."""
# First close
catalog_sqlite.close()
# Second close should not raise an exception
catalog_sqlite.close()