blob: 2ab97b4285e35931d860ff41eb8c04feb6117adc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from pathlib import PosixPath
from typing import Union
import pyarrow as pa
import pytest
from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Table,
TableProperties,
)
from pyiceberg.table.update import (
AddSchemaUpdate,
SetCurrentSchemaUpdate,
)
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.types import IntegerType, LongType, NestedField
@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"})
TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table")
TEST_TABLE_NAMESPACE = ("com", "organization", "department")
TEST_TABLE_NAME = "my_table"
TEST_TABLE_SCHEMA = Schema(
NestedField(1, "x", LongType(), required=True),
NestedField(2, "y", LongType(), doc="comment", required=True),
NestedField(3, "z", LongType(), required=True),
)
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
NO_SUCH_TABLE_ERROR = "Table does not exist: com.organization.department.my_table"
TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table already exists"
NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\('com', 'organization', 'department'\\) already exists"
# TODO: consolidate namespace error messages then remove this
DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 'organization', 'department'\\)"
NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not exists"
NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not empty"
def given_catalog_has_a_table(
catalog: InMemoryCatalog,
properties: Properties = EMPTY_DICT,
) -> Table:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
return catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=properties or TEST_TABLE_PROPERTIES,
)
def test_load_catalog_in_memory() -> None:
assert load_catalog("catalog", type="in-memory")
def test_load_catalog_impl_not_full_path() -> None:
with pytest.raises(ValueError) as exc_info:
load_catalog("catalog", **{"py-catalog-impl": "CustomCatalog"})
assert "py-catalog-impl should be full path (module.CustomCatalog), got: CustomCatalog" in str(exc_info.value)
def test_load_catalog_impl_does_not_exist() -> None:
with pytest.raises(ValueError) as exc_info:
load_catalog("catalog", **{"py-catalog-impl": "pyiceberg.does.not.exist.Catalog"})
assert "Could not initialize Catalog: pyiceberg.does.not.exist.Catalog" in str(exc_info.value)
def test_load_catalog_has_type_and_impl() -> None:
with pytest.raises(ValueError) as exc_info:
load_catalog("catalog", **{"py-catalog-impl": "pyiceberg.does.not.exist.Catalog", "type": "sql"})
assert (
"Must not set both catalog type and py-catalog-impl configurations, "
"but found type sql and py-catalog-impl pyiceberg.does.not.exist.Catalog" in str(exc_info.value)
)
def test_namespace_from_tuple() -> None:
# Given
identifier = ("com", "organization", "department", "my_table")
# When
namespace_from = Catalog.namespace_from(identifier)
# Then
assert namespace_from == ("com", "organization", "department")
def test_namespace_from_str() -> None:
# Given
identifier = "com.organization.department.my_table"
# When
namespace_from = Catalog.namespace_from(identifier)
# Then
assert namespace_from == ("com", "organization", "department")
def test_name_from_tuple() -> None:
# Given
identifier = ("com", "organization", "department", "my_table")
# When
name_from = Catalog.table_name_from(identifier)
# Then
assert name_from == "my_table"
def test_name_from_str() -> None:
# Given
identifier = "com.organization.department.my_table"
# When
name_from = Catalog.table_name_from(identifier)
# Then
assert name_from == "my_table"
def test_create_table(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=new_location,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
assert table.location() == new_location
def test_create_table_removes_trailing_slash_from_location(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=f"{new_location}/",
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
assert table.location() == new_location
@pytest.mark.parametrize(
"schema,expected",
[
(lazy_fixture("pyarrow_schema_simple_without_ids"), lazy_fixture("iceberg_schema_simple_no_ids")),
(lazy_fixture("iceberg_schema_simple"), lazy_fixture("iceberg_schema_simple")),
(lazy_fixture("iceberg_schema_nested"), lazy_fixture("iceberg_schema_nested")),
(lazy_fixture("pyarrow_schema_nested_without_ids"), lazy_fixture("iceberg_schema_nested_no_ids")),
],
)
def test_convert_schema_if_needed(
schema: Union[Schema, pa.Schema],
expected: Schema,
catalog: InMemoryCatalog,
) -> None:
assert expected == catalog._convert_schema_if_needed(schema)
def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=pyarrow_schema_simple_without_ids,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
def test_create_table_raises_error_when_table_already_exists(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
with pytest.raises(TableAlreadyExistsError, match=TABLE_ALREADY_EXISTS_ERROR):
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
)
def test_load_table(catalog: InMemoryCatalog) -> None:
# Given
given_table = given_catalog_has_a_table(catalog)
# When
table = catalog.load_table(TEST_TABLE_IDENTIFIER)
# Then
assert table == given_table
def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
given_table = given_catalog_has_a_table(catalog)
# When
intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
table = catalog.load_table(intermediate._identifier)
# Then
assert table == given_table
def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_table_exists(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# Then
assert catalog.table_exists(TEST_TABLE_IDENTIFIER)
def test_table_exists_on_table_not_found(catalog: InMemoryCatalog) -> None:
assert not catalog.table_exists(TEST_TABLE_IDENTIFIER)
def test_drop_table(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
catalog.drop_table(TEST_TABLE_IDENTIFIER)
# Then
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
table = given_catalog_has_a_table(catalog)
# When
catalog.drop_table(table._identifier)
# Then
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(table._identifier)
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_purge_table(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
catalog.purge_table(TEST_TABLE_IDENTIFIER)
# Then
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_rename_table(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
new_table = "new.namespace.new_table"
catalog.create_namespace(("new", "namespace"))
table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table)
# Then
assert table._identifier == Catalog.identifier_to_tuple(new_table)
# And
table = catalog.load_table(new_table)
assert table._identifier == Catalog.identifier_to_tuple(new_table)
# And
assert catalog._namespace_exists(table._identifier[:-1])
# And
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
table = given_catalog_has_a_table(catalog)
# When
new_table_name = "new.namespace.new_table"
catalog.create_namespace(("new", "namespace"))
new_table = catalog.rename_table(table._identifier, new_table_name)
# Then
assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name)
# And
new_table = catalog.load_table(new_table._identifier)
assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name)
# And
assert catalog._namespace_exists(new_table._identifier[:-1])
# And
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(table._identifier)
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
def test_create_namespace(catalog: InMemoryCatalog) -> None:
# When
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
# Then
assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
assert TEST_TABLE_PROPERTIES == catalog.load_namespace_properties(TEST_TABLE_NAMESPACE)
def test_create_namespace_raises_error_on_existing_namespace(catalog: InMemoryCatalog) -> None:
# Given
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
# When
with pytest.raises(NamespaceAlreadyExistsError, match=NAMESPACE_ALREADY_EXISTS_ERROR):
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
def test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
catalog.load_namespace_properties(TEST_TABLE_NAMESPACE)
def test_list_namespaces(catalog: InMemoryCatalog) -> None:
# Given
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
# When
namespaces = catalog.list_namespaces()
# Then
assert TEST_TABLE_NAMESPACE[:1] in namespaces
# When
namespaces = catalog.list_namespaces(TEST_TABLE_NAMESPACE)
# Then
assert not namespaces
def test_drop_namespace(catalog: InMemoryCatalog) -> None:
# Given
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
# When
catalog.drop_namespace(TEST_TABLE_NAMESPACE)
# Then
assert not catalog._namespace_exists(TEST_TABLE_NAMESPACE)
def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchNamespaceError, match=DROP_NOT_EXISTING_NAMESPACE_ERROR):
catalog.drop_namespace(TEST_TABLE_NAMESPACE)
def test_drop_namespace_raises_error_when_namespace_not_empty(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
with pytest.raises(NamespaceNotEmptyError, match=NAMESPACE_NOT_EMPTY_ERROR):
catalog.drop_namespace(TEST_TABLE_NAMESPACE)
def test_list_tables(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
# Then
assert tables
assert TEST_TABLE_IDENTIFIER in tables
def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
new_namespace = ("new", "namespace")
catalog.create_namespace(new_namespace)
# When
all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
new_namespace_tables = catalog.list_tables(new_namespace)
# Then
assert all_tables
assert TEST_TABLE_IDENTIFIER in all_tables
assert new_namespace_tables == []
def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None:
# Given
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
# When
new_metadata = {"key3": "value3", "key4": "value4"}
summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, updates=new_metadata)
# Then
assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
assert summary.removed == []
assert sorted(summary.updated) == ["key3", "key4"]
assert summary.missing == []
def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
# Given
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
# When
new_metadata = {"key3": "value3", "key4": "value4"}
remove_metadata = {"key1"}
summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, remove_metadata, new_metadata)
# Then
assert catalog._namespace_exists(TEST_TABLE_NAMESPACE)
assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
assert remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys())
assert summary.removed == ["key1"]
assert sorted(summary.updated) == ["key3", "key4"]
assert summary.missing == []
def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, updates=TEST_TABLE_PROPERTIES)
def test_commit_table(catalog: InMemoryCatalog) -> None:
# Given
given_table = given_catalog_has_a_table(catalog)
new_schema = Schema(
NestedField(1, "x", LongType()),
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
NestedField(4, "add", LongType()),
)
# When
response = given_table.catalog.commit_table(
given_table,
updates=(
AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
),
requirements=(),
)
# Then
assert response.metadata.table_uuid == given_table.metadata.table_uuid
assert len(response.metadata.schemas) == 2
assert response.metadata.schemas[1] == new_schema
assert response.metadata.current_schema_id == new_schema.schema_id
def test_add_column(catalog: InMemoryCatalog) -> None:
given_table = given_catalog_has_a_table(catalog)
given_table.update_schema().add_column(path="new_column1", field_type=IntegerType()).commit()
assert given_table.schema() == Schema(
NestedField(field_id=1, name="x", field_type=LongType(), required=True),
NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
NestedField(field_id=3, name="z", field_type=LongType(), required=True),
NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False),
identifier_field_ids=[],
)
assert given_table.schema().schema_id == 1
transaction = given_table.transaction()
transaction.update_schema().add_column(path="new_column2", field_type=IntegerType(), doc="doc").commit()
transaction.commit_transaction()
assert given_table.schema() == Schema(
NestedField(field_id=1, name="x", field_type=LongType(), required=True),
NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
NestedField(field_id=3, name="z", field_type=LongType(), required=True),
NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False),
NestedField(field_id=5, name="new_column2", field_type=IntegerType(), required=False, doc="doc"),
identifier_field_ids=[],
)
assert given_table.schema().schema_id == 2
def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:
given_table = given_catalog_has_a_table(catalog)
with given_table.update_schema() as tx:
tx.add_column(path="new_column1", field_type=IntegerType())
assert given_table.schema() == Schema(
NestedField(field_id=1, name="x", field_type=LongType(), required=True),
NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
NestedField(field_id=3, name="z", field_type=LongType(), required=True),
NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False),
identifier_field_ids=[],
)
assert given_table.schema().schema_id == 1
with given_table.transaction() as tx:
tx.update_schema().add_column(path="new_column2", field_type=IntegerType(), doc="doc").commit()
assert given_table.schema() == Schema(
NestedField(field_id=1, name="x", field_type=LongType(), required=True),
NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
NestedField(field_id=3, name="z", field_type=LongType(), required=True),
NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False),
NestedField(field_id=5, name="new_column2", field_type=IntegerType(), required=False, doc="doc"),
identifier_field_ids=[],
)
assert given_table.schema().schema_id == 2
def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in_memory.catalog (<class 'pyiceberg.catalog.memory.InMemoryCatalog'>)"
def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
# table properties can be set to int, but still serialized to string
property_with_int = {"property_name": 42}
given_table = given_catalog_has_a_table(catalog, properties=property_with_int)
assert isinstance(given_table.properties["property_name"], str)
def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None:
property_with_none = {"property_name": None}
with pytest.raises(ValidationError) as exc_info:
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None:
metadata_path = f"{catalog._warehouse_location}/custom/path"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
)
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore
location_provider = table.location_provider()
assert location_provider.new_metadata_location("").startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.location() != metadata_path
assert table.metadata_location.startswith(metadata_path)
def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
metadata_path = f"{table.location()}/metadata"
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore
location_provider = table.location_provider()
assert location_provider.new_metadata_location("").startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.metadata_location.startswith(metadata_path)
def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)
initial_metadata_path = f"{table.location()}/metadata"
assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json"
# update table with new path for metadata
new_metadata_path = f"{table.location()}/custom/path"
table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction()
assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json"