# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from datetime import date, datetime, time, timezone
from decimal import Decimal
from typing import Any
from uuid import UUID

import pytest

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError, ValidationError
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema, prune_columns
from pyiceberg.table import Table, TableProperties
from pyiceberg.table.name_mapping import MappedField, NameMapping, create_mapping_from_schema
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.types import (
    BinaryType,
    BooleanType,
    DateType,
    DecimalType,
    DoubleType,
    FixedType,
    FloatType,
    IntegerType,
    ListType,
    LongType,
    MapType,
    NestedField,
    PrimitiveType,
    StringType,
    StructType,
    TimestampType,
    TimestamptzType,
    TimeType,
    UUIDType,
)


@pytest.fixture()
def catalog() -> Catalog:
    return load_catalog(
        "local",
        **{
            "type": "rest",
            "uri": "http://localhost:8181",
            "s3.endpoint": "http://localhost:9000",
            "s3.access-key-id": "admin",
            "s3.secret-access-key": "password",
        },
    )


@pytest.fixture()
def simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table:
    return _create_table_with_schema(catalog, table_schema_simple)


def _create_table_with_schema(catalog: Catalog, schema: Schema, properties: Properties = EMPTY_DICT) -> Table:
    tbl_name = "default.test_schema_evolution"
    try:
        catalog.drop_table(tbl_name)
    except NoSuchTableError:
        pass
    return catalog.create_table(
        identifier=tbl_name,
        schema=schema,
        properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json(), **properties},
    )


@pytest.mark.integration
def test_add_already_exists(catalog: Catalog, table_schema_nested: Schema) -> None:
    table = _create_table_with_schema(catalog, table_schema_nested)
    update = table.update_schema()

    with pytest.raises(ValueError) as exc_info:
        update.add_column("foo", IntegerType())
    assert "already exists: foo" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        update.add_column(path=("location", "latitude"), field_type=IntegerType())
    assert "already exists: location.latitude" in str(exc_info.value)


@pytest.mark.integration
def test_add_to_non_struct_type(catalog: Catalog, table_schema_simple: Schema) -> None:
    table = _create_table_with_schema(catalog, table_schema_simple)
    update = table.update_schema()
    with pytest.raises(ValueError) as exc_info:
        update.add_column(path=("foo", "lat"), field_type=IntegerType())
    assert "Cannot add column 'lat' to non-struct type: foo" in str(exc_info.value)


@pytest.mark.integration
def test_schema_evolution_nested_field(catalog: Catalog) -> None:
    schema = Schema(
        NestedField(
            field_id=1,
            name="foo",
            field_type=StructType(NestedField(2, name="bar", field_type=StringType(), required=False)),
            required=False,
        ),
    )
    tbl = _create_table_with_schema(catalog, schema)

    assert tbl.schema() == schema

    with pytest.raises(ValidationError) as exc_info:
        with tbl.transaction() as tx:
            tx.update_schema().update_column("foo", StringType()).commit()

    assert "Cannot change column type: struct<2: bar: optional string> is not a primitive" in str(exc_info.value)


@pytest.mark.integration
def test_schema_evolution_via_transaction(catalog: Catalog) -> None:
    schema = Schema(
        NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
        NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
    )
    tbl = _create_table_with_schema(catalog, schema)

    assert tbl.schema() == schema

    with tbl.transaction() as tx:
        tx.update_schema().add_column("col_string", StringType()).commit()

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
        NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
        NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
    )

    tbl.update_schema().add_column("col_integer", IntegerType()).commit()

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
        NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
        NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
        NestedField(field_id=4, name="col_integer", field_type=IntegerType(), required=False),
    )

    with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 2, found 3"):
        with tbl.transaction() as tx:
            # Start a new update
            schema_update = tx.update_schema()

            # Do a concurrent update
            tbl.update_schema().add_column("col_long", LongType()).commit()

            # stage another update in the transaction
            schema_update.add_column("col_double", DoubleType()).commit()

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
        NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
        NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
        NestedField(field_id=4, name="col_integer", field_type=IntegerType(), required=False),
        NestedField(field_id=5, name="col_long", field_type=LongType(), required=False),
    )


@pytest.mark.integration
def test_schema_evolution_nested(catalog: Catalog) -> None:
    nested_schema = Schema(
        NestedField(
            field_id=1,
            name="location_lookup",
            field_type=MapType(
                key_id=10,
                key_type=StringType(),
                value_id=11,
                value_type=StructType(
                    NestedField(field_id=110, name="x", field_type=FloatType(), required=False),
                    NestedField(field_id=111, name="y", field_type=FloatType(), required=False),
                ),
                element_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=2,
            name="locations",
            field_type=ListType(
                element_id=20,
                element_type=StructType(
                    NestedField(field_id=200, name="x", field_type=FloatType(), required=False),
                    NestedField(field_id=201, name="y", field_type=FloatType(), required=False),
                ),
                element_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=3,
            name="person",
            field_type=StructType(
                NestedField(field_id=30, name="name", field_type=StringType(), required=False),
                NestedField(field_id=31, name="age", field_type=IntegerType(), required=True),
            ),
            required=False,
        ),
    )

    tbl = _create_table_with_schema(catalog, nested_schema)

    assert tbl.schema().highest_field_id == 12

    with tbl.update_schema() as schema_update:
        schema_update.add_column(("location_lookup", "z"), FloatType())
        schema_update.add_column(("locations", "z"), FloatType())
        schema_update.add_column(("person", "address"), StringType())

    assert str(tbl.schema()) == str(
        Schema(
            NestedField(
                field_id=1,
                name="location_lookup",
                field_type=MapType(
                    type="map",
                    key_id=4,
                    key_type=StringType(),
                    value_id=5,
                    value_type=StructType(
                        NestedField(field_id=6, name="x", field_type=FloatType(), required=False),
                        NestedField(field_id=7, name="y", field_type=FloatType(), required=False),
                        NestedField(field_id=13, name="z", field_type=FloatType(), required=False),
                    ),
                    value_required=True,
                ),
                required=True,
            ),
            NestedField(
                field_id=2,
                name="locations",
                field_type=ListType(
                    type="list",
                    element_id=8,
                    element_type=StructType(
                        NestedField(field_id=9, name="x", field_type=FloatType(), required=False),
                        NestedField(field_id=10, name="y", field_type=FloatType(), required=False),
                        NestedField(field_id=14, name="z", field_type=FloatType(), required=False),
                    ),
                    element_required=True,
                ),
                required=True,
            ),
            NestedField(
                field_id=3,
                name="person",
                field_type=StructType(
                    NestedField(field_id=11, name="name", field_type=StringType(), required=False),
                    NestedField(field_id=12, name="age", field_type=IntegerType(), required=True),
                    NestedField(field_id=15, name="address", field_type=StringType(), required=False),
                ),
                required=False,
            ),
        )
    )


schema_nested = Schema(
    NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
    NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
    NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
    NestedField(
        field_id=4,
        name="qux",
        field_type=ListType(type="list", element_id=8, element_type=StringType(), element_required=True),
        required=True,
    ),
    NestedField(
        field_id=5,
        name="quux",
        field_type=MapType(
            type="map",
            key_id=9,
            key_type=StringType(),
            value_id=10,
            value_type=MapType(
                type="map", key_id=11, key_type=StringType(), value_id=12, value_type=IntegerType(), value_required=True
            ),
            value_required=True,
        ),
        required=True,
    ),
    NestedField(
        field_id=6,
        name="location",
        field_type=ListType(
            type="list",
            element_id=13,
            element_type=StructType(
                NestedField(field_id=14, name="latitude", field_type=FloatType(), required=False),
                NestedField(field_id=15, name="longitude", field_type=FloatType(), required=False),
            ),
            element_required=True,
        ),
        required=True,
    ),
    NestedField(
        field_id=7,
        name="person",
        field_type=StructType(
            NestedField(field_id=16, name="name", field_type=StringType(), required=False),
            NestedField(field_id=17, name="age", field_type=IntegerType(), required=True),
        ),
        required=False,
    ),
    identifier_field_ids=[2],
)


@pytest.fixture()
def nested_table(catalog: Catalog) -> Table:
    return _create_table_with_schema(catalog, schema_nested)


@pytest.mark.integration
def test_no_changes(simple_table: Table, table_schema_simple: Schema) -> None:
    with simple_table.update_schema() as _:
        pass

    assert simple_table.schema() == table_schema_simple


@pytest.mark.integration
def test_no_changes_empty_commit(simple_table: Table, table_schema_simple: Schema) -> None:
    with simple_table.update_schema() as update:
        # No updates, so this should be a noop
        update.update_column(path="foo")

    assert simple_table.schema() == table_schema_simple


@pytest.mark.integration
def test_revert_changes(simple_table: Table, table_schema_simple: Schema) -> None:
    with simple_table.update_schema() as update:
        update.add_column(path="data", field_type=IntegerType(), required=False)

    with simple_table.update_schema(allow_incompatible_changes=True) as update:
        update.delete_column(path="data")

    assert simple_table.schemas() == {
        0: Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
            NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
            NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
            identifier_field_ids=[2],
        ),
        1: Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
            NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
            NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
            NestedField(field_id=4, name="data", field_type=IntegerType(), required=False),
            identifier_field_ids=[2],
        ),
    }
    assert simple_table.schema().schema_id == 0
    assert simple_table.schemas()[0].schema_id == 0
    assert simple_table.schemas()[1].schema_id == 1


@pytest.mark.integration
def test_delete_field(simple_table: Table) -> None:
    with simple_table.update_schema() as schema_update:
        schema_update.delete_column("foo")

    assert simple_table.schema() == Schema(
        # foo is missing 👍
        NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
        NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
        identifier_field_ids=[2],
    )


@pytest.mark.integration
def test_delete_field_case_insensitive(simple_table: Table) -> None:
    with simple_table.update_schema(case_sensitive=False) as schema_update:
        schema_update.delete_column("FOO")

    assert simple_table.schema() == Schema(
        # foo is missing 👍
        NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
        NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
        identifier_field_ids=[2],
    )


@pytest.mark.integration
def test_delete_identifier_fields(simple_table: Table) -> None:
    with pytest.raises(ValueError) as exc_info:
        with simple_table.update_schema() as schema_update:
            schema_update.delete_column("bar")

    assert "Cannot find identifier field bar. In case of deletion, update the identifier fields first." in str(exc_info)


@pytest.mark.integration
def test_delete_identifier_fields_nested(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
            NestedField(
                field_id=2,
                name="person",
                field_type=StructType(
                    NestedField(field_id=3, name="name", field_type=StringType(), required=True),
                    NestedField(field_id=4, name="age", field_type=IntegerType(), required=True),
                ),
                required=True,
            ),
            identifier_field_ids=[3],
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.delete_column("person")

    assert "Cannot find identifier field person.name. In case of deletion, update the identifier fields first." in str(exc_info)


@pytest.mark.parametrize(
    "field",
    [
        "foo",
        "baz",
        "qux",
        "quux",
        "location",
        "location.element.latitude",
        "location.element.longitude",
        "person",
        "person.name",
        "person.age",
    ],
)
@pytest.mark.integration
def test_deletes(field: str, nested_table: Table) -> None:
    with nested_table.update_schema() as schema_update:
        schema_update.delete_column(field)

    selected_ids = {
        field_id
        for field_id in schema_nested.field_ids
        if not isinstance(schema_nested.find_field(field_id).field_type, (MapType, ListType))
        and not schema_nested.find_column_name(field_id).startswith(field)  # type: ignore
    }
    expected_schema = prune_columns(schema_nested, selected_ids, select_full_types=False)

    assert expected_schema == nested_table.schema()


@pytest.mark.parametrize(
    "field",
    [
        "Foo",
        "Baz",
        "Qux",
        "Quux",
        "Location",
        "Location.element.latitude",
        "Location.element.longitude",
        "Person",
        "Person.name",
        "Person.age",
    ],
)
@pytest.mark.integration
def test_deletes_case_insensitive(field: str, nested_table: Table) -> None:
    with nested_table.update_schema(case_sensitive=False) as schema_update:
        schema_update.delete_column(field)

    selected_ids = {
        field_id
        for field_id in schema_nested.field_ids
        if not isinstance(schema_nested.find_field(field_id).field_type, (MapType, ListType))
        and not schema_nested.find_column_name(field_id).startswith(field.lower())  # type: ignore
    }
    expected_schema = prune_columns(schema_nested, selected_ids, select_full_types=False)

    assert expected_schema == nested_table.schema()


@pytest.mark.integration
def test_update_types(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="bar", field_type=IntegerType(), required=True),
            NestedField(
                field_id=2,
                name="location",
                field_type=ListType(
                    type="list",
                    element_id=3,
                    element_type=StructType(
                        NestedField(field_id=4, name="latitude", field_type=FloatType(), required=False),
                        NestedField(field_id=5, name="longitude", field_type=FloatType(), required=False),
                    ),
                    element_required=True,
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.update_column("bar", LongType())
        schema_update.update_column("location.latitude", DoubleType())
        schema_update.update_column("location.longitude", DoubleType())

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="bar", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="location",
            field_type=ListType(
                type="list",
                element_id=3,
                element_type=StructType(
                    NestedField(field_id=4, name="latitude", field_type=DoubleType(), required=False),
                    NestedField(field_id=5, name="longitude", field_type=DoubleType(), required=False),
                ),
                element_required=True,
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_update_types_case_insensitive(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="bar", field_type=IntegerType(), required=True),
            NestedField(
                field_id=2,
                name="location",
                field_type=ListType(
                    type="list",
                    element_id=3,
                    element_type=StructType(
                        NestedField(field_id=4, name="latitude", field_type=FloatType(), required=False),
                        NestedField(field_id=5, name="longitude", field_type=FloatType(), required=False),
                    ),
                    element_required=True,
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema(case_sensitive=False) as schema_update:
        schema_update.update_column("baR", LongType())
        schema_update.update_column("Location.Latitude", DoubleType())
        schema_update.update_column("Location.Longitude", DoubleType())

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="bar", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="location",
            field_type=ListType(
                type="list",
                element_id=3,
                element_type=StructType(
                    NestedField(field_id=4, name="latitude", field_type=DoubleType(), required=False),
                    NestedField(field_id=5, name="longitude", field_type=DoubleType(), required=False),
                ),
                element_required=True,
            ),
            required=True,
        ),
    )


allowed_promotions = [
    (StringType(), BinaryType()),
    (BinaryType(), StringType()),
    (IntegerType(), LongType()),
    (FloatType(), DoubleType()),
    (DecimalType(9, 2), DecimalType(18, 2)),
]


@pytest.mark.parametrize("from_type, to_type", allowed_promotions, ids=str)
@pytest.mark.integration
def test_allowed_updates(from_type: PrimitiveType, to_type: PrimitiveType, catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="bar", field_type=from_type, required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.update_column("bar", to_type)

    assert tbl.schema() == Schema(NestedField(field_id=1, name="bar", field_type=to_type, required=True))


disallowed_promotions_types = [
    BooleanType(),
    IntegerType(),
    LongType(),
    FloatType(),
    DoubleType(),
    DateType(),
    TimeType(),
    TimestampType(),
    TimestamptzType(),
    StringType(),
    UUIDType(),
    BinaryType(),
    FixedType(3),
    FixedType(4),
    # We'll just allow Decimal promotions right now
    # https://github.com/apache/iceberg/issues/8389
    # DecimalType(9, 2),
    # DecimalType(9, 3),
    DecimalType(18, 2),
]


@pytest.mark.parametrize("from_type", disallowed_promotions_types, ids=str)
@pytest.mark.parametrize("to_type", disallowed_promotions_types, ids=str)
@pytest.mark.integration
def test_disallowed_updates(from_type: PrimitiveType, to_type: PrimitiveType, catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="bar", field_type=from_type, required=True),
        ),
    )

    if from_type != to_type and (from_type, to_type) not in allowed_promotions:
        with pytest.raises(ValidationError) as exc_info:
            with tbl.update_schema() as schema_update:
                schema_update.update_column("bar", to_type)

        assert str(exc_info.value).startswith("Cannot change column type: bar:")
    else:
        with tbl.update_schema() as schema_update:
            schema_update.update_column("bar", to_type)

        assert tbl.schema() == Schema(
            NestedField(field_id=1, name="bar", field_type=to_type, required=True),
        )


@pytest.mark.integration
def test_rename_simple(simple_table: Table) -> None:
    with simple_table.update_schema() as schema_update:
        schema_update.rename_column("foo", "vo")

    with simple_table.transaction() as txn:
        with txn.update_schema() as schema_update:
            schema_update.rename_column("bar", "var")

    assert simple_table.schema() == Schema(
        NestedField(field_id=1, name="vo", field_type=StringType(), required=False),
        NestedField(field_id=2, name="var", field_type=IntegerType(), required=True),
        NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
        identifier_field_ids=[2],
    )

    # Check that the name mapping gets updated
    assert simple_table.name_mapping() == NameMapping(
        [
            MappedField(field_id=1, names=["foo", "vo"]),
            MappedField(field_id=2, names=["bar", "var"]),
            MappedField(field_id=3, names=["baz"]),
        ]
    )


@pytest.mark.integration
def test_rename_simple_nested(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="foo",
                field_type=StructType(NestedField(field_id=2, name="bar", field_type=StringType())),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.rename_column("foo.bar", "vo")

    assert tbl.schema() == Schema(
        NestedField(
            field_id=1,
            name="foo",
            field_type=StructType(NestedField(field_id=2, name="vo", field_type=StringType())),
            required=True,
        ),
    )

    # Check that the name mapping gets updated
    assert tbl.name_mapping() == NameMapping(
        [
            MappedField(field_id=1, names=["foo"], fields=[MappedField(field_id=2, names=["bar", "vo"])]),
        ]
    )


@pytest.mark.integration
def test_rename_simple_nested_with_dots(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="a.b",
                field_type=StructType(NestedField(field_id=2, name="c.d", field_type=StringType())),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.rename_column(("a.b", "c.d"), "e.f")

    assert tbl.schema() == Schema(
        NestedField(
            field_id=1,
            name="a.b",
            field_type=StructType(NestedField(field_id=2, name="e.f", field_type=StringType())),
            required=True,
        ),
    )


@pytest.mark.integration
def test_rename(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="location_lookup",
                field_type=MapType(
                    type="map",
                    key_id=5,
                    key_type=StringType(),
                    value_id=6,
                    value_type=StructType(
                        NestedField(field_id=7, name="x", field_type=FloatType(), required=False),
                        NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
                    ),
                    value_required=True,
                ),
                required=True,
            ),
            NestedField(
                field_id=2,
                name="locations",
                field_type=ListType(
                    type="list",
                    element_id=9,
                    element_type=StructType(
                        NestedField(field_id=10, name="x", field_type=FloatType(), required=False),
                        NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
                    ),
                    element_required=True,
                ),
                required=True,
            ),
            NestedField(
                field_id=3,
                name="person",
                field_type=StructType(
                    NestedField(field_id=12, name="name", field_type=StringType(), required=False),
                    NestedField(field_id=13, name="leeftijd", field_type=IntegerType(), required=True),
                ),
                required=False,
            ),
            NestedField(field_id=4, name="foo", field_type=StringType(), required=True),
            identifier_field_ids=[],
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.rename_column("foo", "bar")
        schema_update.rename_column("location_lookup.x", "latitude")
        schema_update.rename_column("locations.x", "latitude")
        schema_update.rename_column("person.leeftijd", "age")

    assert tbl.schema() == Schema(
        NestedField(
            field_id=1,
            name="location_lookup",
            field_type=MapType(
                type="map",
                key_id=5,
                key_type=StringType(),
                value_id=6,
                value_type=StructType(
                    NestedField(field_id=7, name="latitude", field_type=FloatType(), required=False),
                    NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
                ),
                value_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=2,
            name="locations",
            field_type=ListType(
                type="list",
                element_id=9,
                element_type=StructType(
                    NestedField(field_id=10, name="latitude", field_type=FloatType(), required=False),
                    NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
                ),
                element_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=3,
            name="person",
            field_type=StructType(
                NestedField(field_id=12, name="name", field_type=StringType(), required=False),
                NestedField(field_id=13, name="age", field_type=IntegerType(), required=True),
            ),
            required=False,
        ),
        NestedField(field_id=4, name="bar", field_type=StringType(), required=True),
        identifier_field_ids=[],
    )


@pytest.mark.integration
def test_rename_case_insensitive(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="location_lookup",
                field_type=MapType(
                    type="map",
                    key_id=5,
                    key_type=StringType(),
                    value_id=6,
                    value_type=StructType(
                        NestedField(field_id=7, name="x", field_type=FloatType(), required=False),
                        NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
                    ),
                    value_required=True,
                ),
                required=True,
            ),
            NestedField(
                field_id=2,
                name="locations",
                field_type=ListType(
                    type="list",
                    element_id=9,
                    element_type=StructType(
                        NestedField(field_id=10, name="x", field_type=FloatType(), required=False),
                        NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
                    ),
                    element_required=True,
                ),
                required=True,
            ),
            NestedField(
                field_id=3,
                name="person",
                field_type=StructType(
                    NestedField(field_id=12, name="name", field_type=StringType(), required=False),
                    NestedField(field_id=13, name="leeftijd", field_type=IntegerType(), required=True),
                ),
                required=True,
            ),
            NestedField(field_id=4, name="foo", field_type=StringType(), required=True),
            identifier_field_ids=[13],
        ),
    )

    with tbl.update_schema(case_sensitive=False) as schema_update:
        schema_update.rename_column("Foo", "bar")
        schema_update.rename_column("Location_lookup.X", "latitude")
        schema_update.rename_column("Locations.X", "latitude")
        schema_update.rename_column("Person.Leeftijd", "age")

    assert tbl.schema() == Schema(
        NestedField(
            field_id=1,
            name="location_lookup",
            field_type=MapType(
                type="map",
                key_id=5,
                key_type=StringType(),
                value_id=6,
                value_type=StructType(
                    NestedField(field_id=7, name="latitude", field_type=FloatType(), required=False),
                    NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
                ),
                value_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=2,
            name="locations",
            field_type=ListType(
                type="list",
                element_id=9,
                element_type=StructType(
                    NestedField(field_id=10, name="latitude", field_type=FloatType(), required=False),
                    NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
                ),
                element_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=3,
            name="person",
            field_type=StructType(
                NestedField(field_id=12, name="name", field_type=StringType(), required=False),
                NestedField(field_id=13, name="age", field_type=IntegerType(), required=True),
            ),
            required=True,
        ),
        NestedField(field_id=4, name="bar", field_type=StringType(), required=True),
        identifier_field_ids=[13],
    )


@pytest.mark.integration
def test_add_struct(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType()),
        ),
    )

    struct = StructType(
        NestedField(field_id=3, name="x", field_type=DoubleType(), required=False),
        NestedField(field_id=4, name="y", field_type=DoubleType(), required=False),
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column("location", struct)

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="foo", field_type=StringType()),
        NestedField(field_id=2, name="location", field_type=struct, required=False),
    )


@pytest.mark.integration
def test_add_nested_map_of_structs(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    map_type_example = MapType(
        key_id=1,
        value_id=2,
        key_type=StructType(
            NestedField(field_id=20, name="address", field_type=StringType(), required=True),
            NestedField(field_id=21, name="city", field_type=StringType(), required=True),
            NestedField(field_id=22, name="state", field_type=StringType(), required=True),
            NestedField(field_id=23, name="zip", field_type=IntegerType(), required=True),
        ),
        value_type=StructType(
            NestedField(field_id=9, name="lat", field_type=DoubleType(), required=True),
            NestedField(field_id=8, name="long", field_type=DoubleType(), required=False),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column("locations", map_type_example)

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        NestedField(
            field_id=2,
            name="locations",
            field_type=MapType(
                type="map",
                key_id=3,
                key_type=StructType(
                    NestedField(field_id=5, name="address", field_type=StringType(), required=True),
                    NestedField(field_id=6, name="city", field_type=StringType(), required=True),
                    NestedField(field_id=7, name="state", field_type=StringType(), required=True),
                    NestedField(field_id=8, name="zip", field_type=IntegerType(), required=True),
                ),
                value_id=4,
                value_type=StructType(
                    NestedField(field_id=9, name="lat", field_type=DoubleType(), required=True),
                    NestedField(field_id=10, name="long", field_type=DoubleType(), required=False),
                ),
                value_required=True,
            ),
            required=False,
        ),
    )


@pytest.mark.integration
def test_add_nested_list_of_structs(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    list_type_examples = ListType(
        element_id=1,
        element_type=StructType(
            NestedField(field_id=9, name="lat", field_type=DoubleType(), required=True),
            NestedField(field_id=10, name="long", field_type=DoubleType(), required=False),
        ),
        element_required=False,
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column("locations", list_type_examples)

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        NestedField(
            field_id=2,
            name="locations",
            field_type=ListType(
                type="list",
                element_id=3,
                element_type=StructType(
                    NestedField(field_id=4, name="lat", field_type=DoubleType(), required=True),
                    NestedField(field_id=5, name="long", field_type=DoubleType(), required=False),
                ),
                element_required=False,
            ),
            required=False,
        ),
    )


@pytest.mark.integration
def test_add_required_column(catalog: Catalog) -> None:
    schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
    table = _create_table_with_schema(catalog, schema_)
    update = table.update_schema()
    with pytest.raises(ValueError, match="Incompatible change: cannot add required column: data"):
        update.add_column(path="data", field_type=IntegerType(), required=True)

    new_schema = (
        UpdateSchema(transaction=table.transaction(), allow_incompatible_changes=True)
        .add_column(path="data", field_type=IntegerType(), required=True)
        ._apply()
    )
    assert new_schema == Schema(
        NestedField(field_id=1, name="a", field_type=BooleanType(), required=False),
        NestedField(field_id=2, name="data", field_type=IntegerType(), required=True),
    )


@pytest.mark.integration
@pytest.mark.parametrize(
    "iceberg_type, initial_default, write_default",
    [
        (BooleanType(), True, False),
        (IntegerType(), 123, 456),
        (LongType(), 123, 456),
        (FloatType(), 19.25, 22.27),
        (DoubleType(), 19.25, 22.27),
        (DecimalType(10, 2), Decimal("19.25"), Decimal("22.27")),
        (DecimalType(10, 2), Decimal("19.25"), Decimal("22.27")),
        (StringType(), "abc", "def"),
        (DateType(), date(1990, 3, 1), date(1991, 3, 1)),
        (TimeType(), time(19, 25, 22), time(22, 25, 22)),
        (TimestampType(), datetime(1990, 5, 1, 22, 1, 1), datetime(2000, 5, 1, 22, 1, 1)),
        (
            TimestamptzType(),
            datetime(1990, 5, 1, 22, 1, 1, tzinfo=timezone.utc),
            datetime(2000, 5, 1, 22, 1, 1, tzinfo=timezone.utc),
        ),
        (BinaryType(), b"123", b"456"),
        (FixedType(4), b"1234", b"5678"),
        (UUIDType(), UUID(int=0x12345678123456781234567812345678), UUID(int=0x32145678123456781234567812345678)),
    ],
)
def test_initial_default_all_columns(
    catalog: Catalog, iceberg_type: PrimitiveType, initial_default: Any, write_default: Any
) -> None:
    # Round trips all the types through the rest catalog to check the serialization
    table = _create_table_with_schema(catalog, Schema(), properties={TableProperties.FORMAT_VERSION: 3})

    tx = table.update_schema()
    tx.add_column(path="data", field_type=iceberg_type, required=True, default_value=initial_default)
    tx.add_column(path="nested", field_type=StructType(), required=False)
    tx.commit()

    tx = table.update_schema()
    tx.add_column(path=("nested", "data"), field_type=iceberg_type, required=True, default_value=initial_default)
    tx.commit()

    for field_id in [1, 3]:
        field = table.schema().find_field(field_id)
        assert field.initial_default == initial_default
        assert field.write_default == initial_default

    with table.update_schema() as tx:
        tx.set_default_value("data", write_default)
        tx.set_default_value(("nested", "data"), write_default)

    for field_id in [1, 3]:
        field = table.schema().find_field(field_id)
        assert field.initial_default == initial_default
        assert field.write_default == write_default


@pytest.mark.integration
def test_add_required_column_initial_default(catalog: Catalog) -> None:
    schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
    table = _create_table_with_schema(catalog, schema_, properties={TableProperties.FORMAT_VERSION: 3})

    table.update_schema().add_column(path="data", field_type=IntegerType(), required=True, default_value=22).commit()

    assert table.schema() == Schema(
        NestedField(field_id=1, name="a", field_type=BooleanType(), required=False),
        NestedField(field_id=2, name="data", field_type=IntegerType(), required=True, initial_default=22, write_default=22),
        schema_id=1,
    )

    # Update
    table.update_schema().update_column(path="data", field_type=LongType()).rename_column("a", "bool").commit()

    assert table.schema() == Schema(
        NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False),
        NestedField(field_id=2, name="data", field_type=LongType(), required=True, initial_default=22, write_default=22),
        schema_id=1,
    )


@pytest.mark.integration
def test_add_required_column_initial_default_invalid_value(catalog: Catalog) -> None:
    schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
    table = _create_table_with_schema(catalog, schema_)
    update = table.update_schema()
    with pytest.raises(ValueError, match="Invalid default value: Could not convert abc into a int"):
        update.add_column(path="data", field_type=IntegerType(), required=True, default_value="abc")


@pytest.mark.integration
def test_add_required_column_case_insensitive(catalog: Catalog) -> None:
    schema_ = Schema(NestedField(field_id=1, name="id", field_type=BooleanType(), required=False))
    table = _create_table_with_schema(catalog, schema_)

    with pytest.raises(ValueError, match="already exists: ID"):
        with table.transaction() as txn:
            with txn.update_schema(allow_incompatible_changes=True) as update:
                update.case_sensitive(False).add_column(path="ID", field_type=IntegerType(), required=True)

    new_schema = (
        UpdateSchema(transaction=table.transaction(), allow_incompatible_changes=True)
        .add_column(path="ID", field_type=IntegerType(), required=True)
        ._apply()
    )
    assert new_schema == Schema(
        NestedField(field_id=1, name="id", field_type=BooleanType(), required=False),
        NestedField(field_id=2, name="ID", field_type=IntegerType(), required=True),
    )


@pytest.mark.integration
def test_make_column_optional(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.make_column_optional("foo")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
    )


@pytest.mark.integration
def test_mixed_changes(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=StringType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=False),
            NestedField(
                field_id=3,
                name="preferences",
                field_type=StructType(
                    NestedField(field_id=8, name="feature1", type=BooleanType(), required=True),
                    NestedField(field_id=9, name="feature2", type=BooleanType(), required=False),
                ),
                required=False,
            ),
            NestedField(
                field_id=4,
                name="locations",
                field_type=MapType(
                    key_id=10,
                    value_id=11,
                    key_type=StructType(
                        NestedField(field_id=20, name="address", field_type=StringType(), required=True),
                        NestedField(field_id=21, name="city", field_type=StringType(), required=True),
                        NestedField(field_id=22, name="state", field_type=StringType(), required=True),
                        NestedField(field_id=23, name="zip", field_type=IntegerType(), required=True),
                    ),
                    value_type=StructType(
                        NestedField(field_id=12, name="lat", field_type=DoubleType(), required=True),
                        NestedField(field_id=13, name="long", field_type=DoubleType(), required=False),
                    ),
                ),
                required=True,
            ),
            NestedField(
                field_id=5,
                name="points",
                field_type=ListType(
                    element_id=14,
                    element_type=StructType(
                        NestedField(field_id=15, name="x", field_type=LongType(), required=True),
                        NestedField(field_id=16, name="y", field_type=LongType(), required=True),
                    ),
                ),
                required=True,
                doc="2-D cartesian points",
            ),
            NestedField(field_id=6, name="doubles", field_type=ListType(element_id=17, element_type=DoubleType()), required=True),
            NestedField(
                field_id=7,
                name="properties",
                field_type=MapType(key_id=18, value_id=19, key_type=StringType(), value_type=StringType()),
                required=False,
            ),
        ),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as schema_update:
        schema_update.add_column("toplevel", field_type=DecimalType(9, 2))
        schema_update.add_column(("locations", "alt"), field_type=FloatType())
        schema_update.add_column(("points", "z"), field_type=LongType())
        schema_update.add_column(("points", "t.t"), field_type=LongType(), doc="name with '.'")
        schema_update.rename_column("data", "json")
        schema_update.rename_column("preferences", "options")
        schema_update.rename_column("preferences.feature2", "newfeature")
        schema_update.rename_column("locations.lat", "latitude")
        schema_update.rename_column("points.x", "X")
        schema_update.rename_column("points.y", "y.y")
        schema_update.update_column("id", field_type=LongType(), doc="unique id")
        schema_update.update_column("locations.lat", DoubleType())
        schema_update.update_column("locations.lat", doc="latitude")
        schema_update.delete_column("locations.long")
        schema_update.delete_column("properties")
        schema_update.make_column_optional("points.x")
        schema_update.update_column("data", required=True)
        schema_update.add_column(("locations", "description"), StringType(), doc="location description")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True, doc="unique id"),
        NestedField(field_id=2, name="json", field_type=StringType(), required=True),
        NestedField(
            field_id=3,
            name="options",
            field_type=StructType(
                NestedField(field_id=8, name="feature1", field_type=BooleanType(), required=True),
                NestedField(field_id=9, name="newfeature", field_type=BooleanType(), required=False),
            ),
            required=False,
        ),
        NestedField(
            field_id=4,
            name="locations",
            field_type=MapType(
                type="map",
                key_id=10,
                key_type=StructType(
                    NestedField(field_id=12, name="address", field_type=StringType(), required=True),
                    NestedField(field_id=13, name="city", field_type=StringType(), required=True),
                    NestedField(field_id=14, name="state", field_type=StringType(), required=True),
                    NestedField(field_id=15, name="zip", field_type=IntegerType(), required=True),
                ),
                value_id=11,
                value_type=StructType(
                    NestedField(field_id=16, name="latitude", field_type=DoubleType(), required=True, doc="latitude"),
                    NestedField(field_id=25, name="alt", field_type=FloatType(), required=False),
                    NestedField(
                        field_id=28, name="description", field_type=StringType(), required=False, doc="location description"
                    ),
                ),
                value_required=True,
            ),
            required=True,
        ),
        NestedField(
            field_id=5,
            name="points",
            field_type=ListType(
                type="list",
                element_id=18,
                element_type=StructType(
                    NestedField(field_id=19, name="X", field_type=LongType(), required=False),
                    NestedField(field_id=20, name="y.y", field_type=LongType(), required=True),
                    NestedField(field_id=26, name="z", field_type=LongType(), required=False),
                    NestedField(field_id=27, name="t.t", field_type=LongType(), required=False, doc="name with '.'"),
                ),
                element_required=True,
            ),
            doc="2-D cartesian points",
            required=True,
        ),
        NestedField(
            field_id=6,
            name="doubles",
            field_type=ListType(type="list", element_id=21, element_type=DoubleType(), element_required=True),
            required=True,
        ),
        NestedField(field_id=24, name="toplevel", field_type=DecimalType(precision=9, scale=2), required=False),
    )


@pytest.mark.integration
def test_ambiguous_column(catalog: Catalog, table_schema_nested: Schema) -> None:
    table = _create_table_with_schema(catalog, table_schema_nested)
    update = UpdateSchema(transaction=table.transaction())

    with pytest.raises(ValueError) as exc_info:
        update.add_column(path="location.latitude", field_type=IntegerType())
    assert "Cannot add column with ambiguous name: location.latitude, provide a tuple instead" in str(exc_info.value)


@pytest.mark.integration
def test_delete_then_add(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.delete_column("foo")
        schema_update.add_column("foo", StringType())

    assert tbl.schema() == Schema(
        NestedField(field_id=2, name="foo", field_type=StringType(), required=False),
    )


@pytest.mark.integration
def test_delete_then_add_nested(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="preferences",
                field_type=StructType(
                    NestedField(field_id=2, name="feature1", field_type=BooleanType()),
                    NestedField(field_id=3, name="feature2", field_type=BooleanType()),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.delete_column("preferences.feature1")
        schema_update.add_column(("preferences", "feature1"), BooleanType())

    assert tbl.schema() == Schema(
        NestedField(
            field_id=1,
            name="preferences",
            field_type=StructType(
                NestedField(field_id=3, name="feature2", field_type=BooleanType()),
                NestedField(field_id=4, name="feature1", field_type=BooleanType(), required=False),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_delete_missing_column(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.delete_column("bar")

    assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)


@pytest.mark.integration
def test_add_delete_conflict(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.add_column("bar", BooleanType())
            schema_update.delete_column("bar")
    assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)

    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="preferences",
                field_type=StructType(
                    NestedField(field_id=2, name="feature1", field_type=BooleanType()),
                    NestedField(field_id=3, name="feature2", field_type=BooleanType()),
                ),
                required=True,
            ),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.add_column(("preferences", "feature3"), BooleanType())
            schema_update.delete_column("preferences")
    assert "Cannot delete a column that has additions: preferences" in str(exc_info.value)


@pytest.mark.integration
def test_rename_missing_column(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.rename_column("bar", "fail")

    assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)


@pytest.mark.integration
def test_rename_missing_conflicts(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.rename_column("foo", "bar")
            schema_update.delete_column("foo")

    assert "Cannot delete a column that has updates: foo" in str(exc_info.value)

    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.rename_column("foo", "bar")
            schema_update.delete_column("bar")

    assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)


@pytest.mark.integration
def test_update_missing_column(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.update_column("bar", DateType())

    assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)


@pytest.mark.integration
def test_update_delete_conflict(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=IntegerType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.update_column("foo", LongType())
            schema_update.delete_column("foo")

    assert "Cannot delete a column that has updates: foo" in str(exc_info.value)


@pytest.mark.integration
def test_delete_update_conflict(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=IntegerType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.delete_column("foo")
            schema_update.update_column("foo", LongType())

    assert "Cannot update a column that will be deleted: foo" in str(exc_info.value)


@pytest.mark.integration
def test_delete_map_key(nested_table: Table) -> None:
    with pytest.raises(ValueError) as exc_info:
        with nested_table.update_schema() as schema_update:
            schema_update.delete_column("quux.key")

    assert "Cannot delete map keys" in str(exc_info.value)


@pytest.mark.integration
def test_add_field_to_map_key(nested_table: Table) -> None:
    with pytest.raises(ValueError) as exc_info:
        with nested_table.update_schema() as schema_update:
            schema_update.add_column(("quux", "key"), StringType())

    assert "Cannot add column 'key' to non-struct type: quux" in str(exc_info.value)


@pytest.mark.integration
def test_alter_map_key(nested_table: Table) -> None:
    with pytest.raises(ValueError) as exc_info:
        with nested_table.update_schema() as schema_update:
            schema_update.update_column(("quux", "key"), BinaryType())

    assert "Cannot update map keys" in str(exc_info.value)


@pytest.mark.integration
def test_update_map_key(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1, name="m", field_type=MapType(key_id=2, value_id=3, key_type=IntegerType(), value_type=DoubleType())
            )
        ),
    )
    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.update_column("m.key", LongType())

    assert "Cannot update map keys: map<int, double>" in str(exc_info.value)


@pytest.mark.integration
def test_update_added_column_doc(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.add_column("value", LongType())
            schema_update.update_column("value", doc="a value")

    assert "Could not find field with name value, case_sensitive=True" in str(exc_info.value)


@pytest.mark.integration
def test_update_deleted_column_doc(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.delete_column("foo")
            schema_update.update_column("foo", doc="a value")

    assert "Cannot update a column that will be deleted: foo" in str(exc_info.value)


@pytest.mark.integration
def test_multiple_moves(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="a", field_type=IntegerType(), required=True),
            NestedField(field_id=2, name="b", field_type=IntegerType(), required=True),
            NestedField(field_id=3, name="c", field_type=IntegerType(), required=True),
            NestedField(field_id=4, name="d", field_type=IntegerType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_first("d")
        schema_update.move_first("c")
        schema_update.move_after("b", "d")
        schema_update.move_before("d", "a")

    assert tbl.schema() == Schema(
        NestedField(field_id=3, name="c", field_type=IntegerType(), required=True),
        NestedField(field_id=2, name="b", field_type=IntegerType(), required=True),
        NestedField(field_id=4, name="d", field_type=IntegerType(), required=True),
        NestedField(field_id=1, name="a", field_type=IntegerType(), required=True),
    )


@pytest.mark.integration
def test_move_top_level_column_first(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_first("data")

    assert tbl.schema() == Schema(
        NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
    )


@pytest.mark.integration
def test_move_top_level_column_before_first(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_before("data", "id")

    assert tbl.schema() == Schema(
        NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
    )


@pytest.mark.integration
def test_move_top_level_column_after_last(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_after("id", "data")

    assert tbl.schema() == Schema(
        NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
    )


@pytest.mark.integration
def test_move_nested_field_first(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_first("struct.data")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="struct",
            field_type=StructType(
                NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                NestedField(field_id=3, name="count", field_type=LongType(), required=True),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_nested_field_before_first(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_before("struct.data", "struct.count")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="struct",
            field_type=StructType(
                NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                NestedField(field_id=3, name="count", field_type=LongType(), required=True),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_nested_field_after_first(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_before("struct.data", "struct.count")

    assert str(tbl.schema()) == str(
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                ),
                required=True,
            ),
        )
    )


@pytest.mark.integration
def test_move_nested_field_after(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                    NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_after("struct.ts", "struct.count")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="struct",
            field_type=StructType(
                NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
                NestedField(field_id=4, name="data", field_type=StringType(), required=True),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_nested_field_before(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                    NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_before("struct.ts", "struct.data")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="struct",
            field_type=StructType(
                NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
                NestedField(field_id=4, name="data", field_type=StringType(), required=True),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_map_value_struct_field(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="map",
                field_type=MapType(
                    key_id=3,
                    value_id=4,
                    key_type=StringType(),
                    value_type=StructType(
                        NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
                        NestedField(field_id=6, name="count", field_type=LongType(), required=True),
                        NestedField(field_id=7, name="data", field_type=StringType(), required=True),
                    ),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.move_before("map.ts", "map.data")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="map",
            field_type=MapType(
                key_id=3,
                value_id=4,
                key_type=StringType(),
                value_type=StructType(
                    NestedField(field_id=6, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
                    NestedField(field_id=7, name="data", field_type=StringType(), required=True),
                ),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_added_top_level_column(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column("ts", TimestamptzType())
        schema_update.move_after("ts", "id")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(field_id=3, name="ts", field_type=TimestamptzType(), required=False),
        NestedField(field_id=2, name="data", field_type=StringType(), required=True),
    )


@pytest.mark.integration
def test_move_added_top_level_column_after_added_column(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column("ts", TimestamptzType())
        schema_update.add_column("count", LongType())
        schema_update.move_after("ts", "id")
        schema_update.move_after("count", "ts")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(field_id=3, name="ts", field_type=TimestamptzType(), required=False),
        NestedField(field_id=4, name="count", field_type=LongType(), required=False),
        NestedField(field_id=2, name="data", field_type=StringType(), required=True),
    )


@pytest.mark.integration
def test_move_added_nested_struct_field(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column(("struct", "ts"), TimestamptzType())
        schema_update.move_before("struct.ts", "struct.count")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="struct",
            field_type=StructType(
                NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=False),
                NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                NestedField(field_id=4, name="data", field_type=StringType(), required=True),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_added_nested_field_before_added_column(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(
                field_id=2,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                    NestedField(field_id=4, name="data", field_type=StringType(), required=True),
                ),
                required=True,
            ),
        ),
    )

    with tbl.update_schema() as schema_update:
        schema_update.add_column(("struct", "ts"), TimestamptzType())
        schema_update.add_column(("struct", "size"), LongType())
        schema_update.move_before("struct.ts", "struct.count")
        schema_update.move_before("struct.size", "struct.ts")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="id", field_type=LongType(), required=True),
        NestedField(
            field_id=2,
            name="struct",
            field_type=StructType(
                NestedField(field_id=6, name="size", field_type=LongType(), required=False),
                NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=False),
                NestedField(field_id=3, name="count", field_type=LongType(), required=True),
                NestedField(field_id=4, name="data", field_type=StringType(), required=True),
            ),
            required=True,
        ),
    )


@pytest.mark.integration
def test_move_self_reference_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType()),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("foo", "foo")
    assert "Cannot move foo before itself" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_after("foo", "foo")
    assert "Cannot move foo after itself" in str(exc_info.value)


@pytest.mark.integration
def test_move_missing_column_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType()),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_first("items")
    assert "Cannot move missing column: items" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("items", "id")
    assert "Cannot move missing column: items" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_after("items", "data")
    assert "Cannot move missing column: items" in str(exc_info.value)


@pytest.mark.integration
def test_move_before_add_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType()),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_first("ts")
            update.add_column("ts", TimestamptzType())
    assert "Cannot move missing column: ts" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("ts", "id")
            update.add_column("ts", TimestamptzType())
    assert "Cannot move missing column: ts" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_after("ts", "data")
            update.add_column("ts", TimestamptzType())
    assert "Cannot move missing column: ts" in str(exc_info.value)


@pytest.mark.integration
def test_move_missing_reference_column_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("id", "items")
    assert "Cannot move id before missing column: items" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_after("data", "items")
    assert "Cannot move data after missing column: items" in str(exc_info.value)


@pytest.mark.integration
def test_move_primitive_map_key_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
            NestedField(
                field_id=3,
                name="map",
                field_type=MapType(key_id=4, value_id=5, key_type=StringType(), value_type=StringType()),
                required=False,
            ),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("map.key", "map.value")
    assert "Cannot move fields in non-struct type: map<string, string>" in str(exc_info.value)


@pytest.mark.integration
def test_move_primitive_map_value_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
            NestedField(
                field_id=3,
                name="map",
                field_type=MapType(key_id=4, value_id=5, key_type=StringType(), value_type=StructType()),
                required=False,
            ),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("map.value", "map.key")
    assert "Cannot move fields in non-struct type: map<string, struct<>>" in str(exc_info.value)


@pytest.mark.integration
def test_move_top_level_between_structs_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="a", field_type=IntegerType(), required=True),
            NestedField(field_id=2, name="b", field_type=IntegerType(), required=True),
            NestedField(
                field_id=3,
                name="struct",
                field_type=StructType(
                    NestedField(field_id=4, name="x", field_type=IntegerType(), required=True),
                    NestedField(field_id=5, name="y", field_type=IntegerType(), required=True),
                ),
                required=False,
            ),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("a", "struct.x")
    assert "Cannot move field a to a different struct" in str(exc_info.value)


@pytest.mark.integration
def test_move_between_structs_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(
                field_id=1,
                name="s1",
                field_type=StructType(
                    NestedField(field_id=3, name="a", field_type=IntegerType(), required=True),
                    NestedField(field_id=4, name="b", field_type=IntegerType(), required=True),
                ),
                required=False,
            ),
            NestedField(
                field_id=2,
                name="s2",
                field_type=StructType(
                    NestedField(field_id=5, name="x", field_type=IntegerType(), required=True),
                    NestedField(field_id=6, name="y", field_type=IntegerType(), required=True),
                ),
                required=False,
            ),
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update:
            update.move_before("s2.x", "s1.a")

    assert "Cannot move field s2.x to a different struct" in str(exc_info.value)


@pytest.mark.integration
def test_add_existing_identifier_fields(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema() as update_schema:
        update_schema.set_identifier_fields("foo")

    assert tbl.schema().identifier_field_names() == {"foo"}


@pytest.mark.integration
def test_add_new_identifiers_field_columns(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column("new_field", StringType(), required=True)
        update_schema.set_identifier_fields("foo", "new_field")

    assert tbl.schema().identifier_field_names() == {"foo", "new_field"}


@pytest.mark.integration
def test_add_new_identifiers_field_columns_out_of_order(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column("new_field", StringType(), required=True)
        update_schema.set_identifier_fields("foo", "new_field")

    assert tbl.schema().identifier_field_names() == {"foo", "new_field"}


@pytest.mark.integration
def test_add_nested_identifier_field_columns(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column(
            "required_struct", StructType(NestedField(field_id=3, name="field", type=StringType(), required=True)), required=True
        )

    with tbl.update_schema() as update_schema:
        update_schema.set_identifier_fields("required_struct.field")

    assert tbl.schema().identifier_field_names() == {"required_struct.field"}


@pytest.mark.integration
def test_add_nested_identifier_field_columns_single_transaction(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column(
            "new", StructType(NestedField(field_id=3, name="field", type=StringType(), required=True)), required=True
        )
        update_schema.set_identifier_fields("new.field")

    assert tbl.schema().identifier_field_names() == {"new.field"}


@pytest.mark.integration
def test_add_nested_nested_identifier_field_columns(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column(
            "new",
            StructType(
                NestedField(
                    field_id=3,
                    name="field",
                    type=StructType(NestedField(field_id=4, name="nested", type=StringType(), required=True)),
                    required=True,
                )
            ),
            required=True,
        )
        update_schema.set_identifier_fields("new.field.nested")

    assert tbl.schema().identifier_field_names() == {"new.field.nested"}


@pytest.mark.integration
def test_add_dotted_identifier_field_columns(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column(("dot.field",), StringType(), required=True)
        update_schema.set_identifier_fields("dot.field")

    assert tbl.schema().identifier_field_names() == {"dot.field"}


@pytest.mark.integration
def test_remove_identifier_fields(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.add_column(("new_field",), StringType(), required=True)
        update_schema.add_column(("new_field2",), StringType(), required=True)
        update_schema.set_identifier_fields("foo", "new_field", "new_field2")

    assert tbl.schema().identifier_field_names() == {"foo", "new_field", "new_field2"}

    with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
        update_schema.set_identifier_fields()

    assert tbl.schema().identifier_field_names() == set()


@pytest.mark.integration
def test_set_identifier_field_fails_schema(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
            NestedField(field_id=2, name="float", field_type=FloatType(), required=True),
            NestedField(field_id=3, name="double", field_type=DoubleType(), required=True),
            identifier_field_ids=[],
        ),
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update_schema:
            update_schema.set_identifier_fields("id")

    assert "Identifier field 1 invalid: not a required field" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update_schema:
            update_schema.set_identifier_fields("float")

    assert "Identifier field 2 invalid: must not be float or double field" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update_schema:
            update_schema.set_identifier_fields("double")

    assert "Identifier field 3 invalid: must not be float or double field" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as update_schema:
            update_schema.set_identifier_fields("unknown")

    assert "Cannot find identifier field unknown. In case of deletion, update the identifier fields first." in str(exc_info.value)


@pytest.mark.integration
def test_set_identifier_field_fails(nested_table: Table) -> None:
    with pytest.raises(ValueError) as exc_info:
        with nested_table.update_schema() as update_schema:
            update_schema.set_identifier_fields("location")

    assert "Identifier field 6 invalid: not a primitive type field" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with nested_table.update_schema() as update_schema:
            update_schema.set_identifier_fields("baz")

    assert "Identifier field 3 invalid: not a required field" in str(exc_info.value)

    with pytest.raises(ValueError) as exc_info:
        with nested_table.update_schema() as update_schema:
            update_schema.set_identifier_fields("person.name")

    assert "Identifier field 16 invalid: not a required field" in str(exc_info.value)


@pytest.mark.integration
def test_delete_identifier_field_columns(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema() as schema_update:
        schema_update.delete_column("foo")
        schema_update.set_identifier_fields()

    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema() as schema_update:
        schema_update.set_identifier_fields()
        schema_update.delete_column("foo")


@pytest.mark.integration
def test_delete_containing_nested_identifier_field_columns_fails(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema(allow_incompatible_changes=True) as schema_update:
        schema_update.add_column(
            "out", StructType(NestedField(field_id=3, name="nested", field_type=StringType(), required=True)), required=True
        )
        schema_update.set_identifier_fields("out.nested")

    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
        NestedField(
            field_id=2,
            name="out",
            field_type=StructType(NestedField(field_id=3, name="nested", field_type=StringType(), required=True)),
            required=True,
        ),
        identifier_field_ids=[3],
    )

    with pytest.raises(ValueError) as exc_info:
        with tbl.update_schema() as schema_update:
            schema_update.delete_column("out")

    assert "Cannot find identifier field out.nested. In case of deletion, update the identifier fields first." in str(exc_info)


@pytest.mark.integration
def test_rename_identifier_fields(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
    )

    with tbl.update_schema() as schema_update:
        schema_update.rename_column("foo", "bar")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"bar"}


@pytest.mark.integration
def test_move_identifier_fields(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
            identifier_field_ids=[1],
        ),
    )

    with tbl.update_schema() as update:
        update.move_before("data", "id")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"id"}

    with tbl.update_schema() as update:
        update.move_after("id", "data")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"id"}

    with tbl.update_schema() as update:
        update.move_first("data")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"id"}


@pytest.mark.integration
def test_move_identifier_fields_case_insensitive(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="id", field_type=LongType(), required=True),
            NestedField(field_id=2, name="data", field_type=StringType(), required=True),
            identifier_field_ids=[1],
        ),
    )

    with tbl.update_schema(case_sensitive=False) as update:
        update.move_before("DATA", "ID")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"id"}

    with tbl.update_schema(case_sensitive=False) as update:
        update.move_after("ID", "DATA")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"id"}

    with tbl.update_schema(case_sensitive=False) as update:
        update.move_first("DATA")

    assert tbl.schema().identifier_field_ids == [1]
    assert tbl.schema().identifier_field_names() == {"id"}


@pytest.mark.integration
def test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None:
    tbl = _create_table_with_schema(
        catalog,
        Schema(
            NestedField(field_id=1, name="foo", field_type=StringType()),
        ),
    )

    with tbl.transaction() as tr:
        with tr.update_schema() as update:
            update.add_column("bar", field_type=StringType())
        with tr.update_schema() as update:
            update.add_column("baz", field_type=StringType())

    assert tbl.schema().schema_id == 2
    assert tbl.schema() == Schema(
        NestedField(field_id=1, name="foo", field_type=StringType()),
        NestedField(field_id=2, name="bar", field_type=StringType()),
        NestedField(field_id=3, name="baz", field_type=StringType()),
    )


@pytest.mark.integration
def test_create_table_integrity_after_fresh_assignment(catalog: Catalog) -> None:
    schema = Schema(
        NestedField(field_id=5, name="col_uuid", field_type=UUIDType(), required=False),
        NestedField(field_id=4, name="col_fixed", field_type=FixedType(25), required=False),
    )
    partition_spec = PartitionSpec(
        PartitionField(source_id=5, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0
    )
    sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform()))
    tbl_name = "default.test_create_integrity"
    try:
        catalog.drop_table(tbl_name)
    except NoSuchTableError:
        pass
    tbl = catalog.create_table(identifier=tbl_name, schema=schema, partition_spec=partition_spec, sort_order=sort_order)
    expected_schema = Schema(
        NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
        NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
    )
    expected_spec = PartitionSpec(
        PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="col_uuid"), spec_id=0
    )
    expected_sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
    assert tbl.schema() == expected_schema
    assert tbl.spec() == expected_spec
    assert tbl.sort_order() == expected_sort_order
