blob: f0ccb1b0e858ef4963999935a1d3bdb7df9a1e79 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
import pytest
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError, ValidationError
from pyiceberg.schema import Schema, prune_columns
from pyiceberg.table import Table, UpdateSchema
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
@pytest.fixture()
def catalog() -> Catalog:
return load_catalog(
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)
@pytest.fixture()
def simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table:
return _create_table_with_schema(catalog, table_schema_simple)
def _create_table_with_schema(catalog: Catalog, schema: Schema) -> Table:
tbl_name = "default.test_schema_evolution"
try:
catalog.drop_table(tbl_name)
except NoSuchTableError:
pass
return catalog.create_table(identifier=tbl_name, schema=schema)
@pytest.mark.integration
def test_add_already_exists(catalog: Catalog, table_schema_nested: Schema) -> None:
table = _create_table_with_schema(catalog, table_schema_nested)
update = UpdateSchema(table)
with pytest.raises(ValueError) as exc_info:
update.add_column("foo", IntegerType())
assert "already exists: foo" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
update.add_column(path=("location", "latitude"), field_type=IntegerType())
assert "already exists: location.latitude" in str(exc_info.value)
@pytest.mark.integration
def test_add_to_non_struct_type(catalog: Catalog, table_schema_simple: Schema) -> None:
table = _create_table_with_schema(catalog, table_schema_simple)
update = UpdateSchema(table)
with pytest.raises(ValueError) as exc_info:
update.add_column(path=("foo", "lat"), field_type=IntegerType())
assert "Cannot add column 'lat' to non-struct type: foo" in str(exc_info.value)
@pytest.mark.integration
def test_schema_evolution_nested_field(catalog: Catalog) -> None:
schema = Schema(
NestedField(
field_id=1,
name="foo",
field_type=StructType(NestedField(2, name="bar", field_type=StringType(), required=False)),
required=False,
),
)
tbl = _create_table_with_schema(catalog, schema)
assert tbl.schema() == schema
with pytest.raises(ValidationError) as exc_info:
with tbl.transaction() as tx:
tx.update_schema().update_column("foo", StringType()).commit()
assert "Cannot change column type: struct<2: bar: optional string> is not a primitive" in str(exc_info.value)
@pytest.mark.integration
def test_schema_evolution_via_transaction(catalog: Catalog) -> None:
schema = Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
)
tbl = _create_table_with_schema(catalog, schema)
assert tbl.schema() == schema
with tbl.transaction() as tx:
tx.update_schema().add_column("col_string", StringType()).commit()
assert tbl.schema() == Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
)
tbl.update_schema().add_column("col_integer", IntegerType()).commit()
assert tbl.schema() == Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
NestedField(field_id=4, name="col_integer", field_type=IntegerType(), required=False),
)
with pytest.raises(CommitFailedException) as exc_info:
with tbl.transaction() as tx:
# Start a new update
schema_update = tx.update_schema()
# Do a concurrent update
tbl.update_schema().add_column("col_long", LongType()).commit()
# stage another update in the transaction
schema_update.add_column("col_double", DoubleType()).commit()
assert "Requirement failed: current schema changed: expected id 2 != 3" in str(exc_info.value)
assert tbl.schema() == Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
NestedField(field_id=4, name="col_integer", field_type=IntegerType(), required=False),
NestedField(field_id=5, name="col_long", field_type=LongType(), required=False),
)
@pytest.mark.integration
def test_schema_evolution_nested(catalog: Catalog) -> None:
nested_schema = Schema(
NestedField(
field_id=1,
name="location_lookup",
field_type=MapType(
key_id=10,
key_type=StringType(),
value_id=11,
value_type=StructType(
NestedField(field_id=110, name="x", field_type=FloatType(), required=False),
NestedField(field_id=111, name="y", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
element_id=20,
element_type=StructType(
NestedField(field_id=200, name="x", field_type=FloatType(), required=False),
NestedField(field_id=201, name="y", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=3,
name="person",
field_type=StructType(
NestedField(field_id=30, name="name", field_type=StringType(), required=False),
NestedField(field_id=31, name="age", field_type=IntegerType(), required=True),
),
required=False,
),
)
tbl = _create_table_with_schema(catalog, nested_schema)
assert tbl.schema().highest_field_id == 12
with tbl.update_schema() as schema_update:
schema_update.add_column(("location_lookup", "z"), FloatType())
schema_update.add_column(("locations", "z"), FloatType())
schema_update.add_column(("person", "address"), StringType())
assert str(tbl.schema()) == str(
Schema(
NestedField(
field_id=1,
name="location_lookup",
field_type=MapType(
type="map",
key_id=4,
key_type=StringType(),
value_id=5,
value_type=StructType(
NestedField(field_id=6, name="x", field_type=FloatType(), required=False),
NestedField(field_id=7, name="y", field_type=FloatType(), required=False),
NestedField(field_id=13, name="z", field_type=FloatType(), required=False),
),
value_required=True,
),
required=True,
),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
type="list",
element_id=8,
element_type=StructType(
NestedField(field_id=9, name="x", field_type=FloatType(), required=False),
NestedField(field_id=10, name="y", field_type=FloatType(), required=False),
NestedField(field_id=14, name="z", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=3,
name="person",
field_type=StructType(
NestedField(field_id=11, name="name", field_type=StringType(), required=False),
NestedField(field_id=12, name="age", field_type=IntegerType(), required=True),
NestedField(field_id=15, name="address", field_type=StringType(), required=False),
),
required=False,
),
)
)
schema_nested = Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
NestedField(
field_id=4,
name="qux",
field_type=ListType(type="list", element_id=8, element_type=StringType(), element_required=True),
required=True,
),
NestedField(
field_id=5,
name="quux",
field_type=MapType(
type="map",
key_id=9,
key_type=StringType(),
value_id=10,
value_type=MapType(
type="map", key_id=11, key_type=StringType(), value_id=12, value_type=IntegerType(), value_required=True
),
value_required=True,
),
required=True,
),
NestedField(
field_id=6,
name="location",
field_type=ListType(
type="list",
element_id=13,
element_type=StructType(
NestedField(field_id=14, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=15, name="longitude", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=7,
name="person",
field_type=StructType(
NestedField(field_id=16, name="name", field_type=StringType(), required=False),
NestedField(field_id=17, name="age", field_type=IntegerType(), required=True),
),
required=False,
),
identifier_field_ids=[2],
)
@pytest.fixture()
def nested_table(catalog: Catalog) -> Table:
return _create_table_with_schema(catalog, schema_nested)
@pytest.mark.integration
def test_no_changes(simple_table: Table, table_schema_simple: Schema) -> None:
with simple_table.update_schema() as _:
pass
assert simple_table.schema() == table_schema_simple
@pytest.mark.integration
def test_no_changes_empty_commit(simple_table: Table, table_schema_simple: Schema) -> None:
with simple_table.update_schema() as update:
# No updates, so this should be a noop
update.update_column(path="foo")
assert simple_table.schema() == table_schema_simple
@pytest.mark.integration
def test_delete_field(simple_table: Table) -> None:
with simple_table.update_schema() as schema_update:
schema_update.delete_column("foo")
assert simple_table.schema() == Schema(
# foo is missing 👍
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
identifier_field_ids=[2],
)
@pytest.mark.integration
def test_delete_field_case_insensitive(simple_table: Table) -> None:
with simple_table.update_schema(case_sensitive=False) as schema_update:
schema_update.delete_column("FOO")
assert simple_table.schema() == Schema(
# foo is missing 👍
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
identifier_field_ids=[2],
)
@pytest.mark.integration
def test_delete_identifier_fields(simple_table: Table) -> None:
with pytest.raises(ValueError) as exc_info:
with simple_table.update_schema() as schema_update:
schema_update.delete_column("bar")
assert "Cannot find identifier field bar. In case of deletion, update the identifier fields first." in str(exc_info)
@pytest.mark.integration
def test_delete_identifier_fields_nested(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
NestedField(
field_id=2,
name="person",
field_type=StructType(
NestedField(field_id=3, name="name", field_type=StringType(), required=True),
NestedField(field_id=4, name="age", field_type=IntegerType(), required=True),
),
required=True,
),
identifier_field_ids=[3],
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.delete_column("person")
assert "Cannot find identifier field person.name. In case of deletion, update the identifier fields first." in str(exc_info)
@pytest.mark.parametrize(
"field",
[
"foo",
"baz",
"qux",
"quux",
"location",
"location.element.latitude",
"location.element.longitude",
"person",
"person.name",
"person.age",
],
)
@pytest.mark.integration
def test_deletes(field: str, nested_table: Table) -> None:
with nested_table.update_schema() as schema_update:
schema_update.delete_column(field)
selected_ids = {
field_id
for field_id in schema_nested.field_ids
if not isinstance(schema_nested.find_field(field_id).field_type, (MapType, ListType))
and not schema_nested.find_column_name(field_id).startswith(field) # type: ignore
}
expected_schema = prune_columns(schema_nested, selected_ids, select_full_types=False)
assert expected_schema == nested_table.schema()
@pytest.mark.parametrize(
"field",
[
"Foo",
"Baz",
"Qux",
"Quux",
"Location",
"Location.element.latitude",
"Location.element.longitude",
"Person",
"Person.name",
"Person.age",
],
)
@pytest.mark.integration
def test_deletes_case_insensitive(field: str, nested_table: Table) -> None:
with nested_table.update_schema(case_sensitive=False) as schema_update:
schema_update.delete_column(field)
selected_ids = {
field_id
for field_id in schema_nested.field_ids
if not isinstance(schema_nested.find_field(field_id).field_type, (MapType, ListType))
and not schema_nested.find_column_name(field_id).startswith(field.lower()) # type: ignore
}
expected_schema = prune_columns(schema_nested, selected_ids, select_full_types=False)
assert expected_schema == nested_table.schema()
@pytest.mark.integration
def test_update_types(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="bar", field_type=IntegerType(), required=True),
NestedField(
field_id=2,
name="location",
field_type=ListType(
type="list",
element_id=3,
element_type=StructType(
NestedField(field_id=4, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=5, name="longitude", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.update_column("bar", LongType())
schema_update.update_column("location.latitude", DoubleType())
schema_update.update_column("location.longitude", DoubleType())
assert tbl.schema() == Schema(
NestedField(field_id=1, name="bar", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="location",
field_type=ListType(
type="list",
element_id=3,
element_type=StructType(
NestedField(field_id=4, name="latitude", field_type=DoubleType(), required=False),
NestedField(field_id=5, name="longitude", field_type=DoubleType(), required=False),
),
element_required=True,
),
required=True,
),
)
@pytest.mark.integration
def test_update_types_case_insensitive(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="bar", field_type=IntegerType(), required=True),
NestedField(
field_id=2,
name="location",
field_type=ListType(
type="list",
element_id=3,
element_type=StructType(
NestedField(field_id=4, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=5, name="longitude", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
),
)
with tbl.update_schema(case_sensitive=False) as schema_update:
schema_update.update_column("baR", LongType())
schema_update.update_column("Location.Latitude", DoubleType())
schema_update.update_column("Location.Longitude", DoubleType())
assert tbl.schema() == Schema(
NestedField(field_id=1, name="bar", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="location",
field_type=ListType(
type="list",
element_id=3,
element_type=StructType(
NestedField(field_id=4, name="latitude", field_type=DoubleType(), required=False),
NestedField(field_id=5, name="longitude", field_type=DoubleType(), required=False),
),
element_required=True,
),
required=True,
),
)
allowed_promotions = [
(StringType(), BinaryType()),
(BinaryType(), StringType()),
(IntegerType(), LongType()),
(FloatType(), DoubleType()),
(DecimalType(9, 2), DecimalType(18, 2)),
]
@pytest.mark.parametrize("from_type, to_type", allowed_promotions, ids=str)
@pytest.mark.integration
def test_allowed_updates(from_type: PrimitiveType, to_type: PrimitiveType, catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="bar", field_type=from_type, required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.update_column("bar", to_type)
assert tbl.schema() == Schema(NestedField(field_id=1, name="bar", field_type=to_type, required=True))
disallowed_promotions_types = [
BooleanType(),
IntegerType(),
LongType(),
FloatType(),
DoubleType(),
DateType(),
TimeType(),
TimestampType(),
TimestamptzType(),
StringType(),
UUIDType(),
BinaryType(),
FixedType(3),
FixedType(4),
# We'll just allow Decimal promotions right now
# https://github.com/apache/iceberg/issues/8389
# DecimalType(9, 2),
# DecimalType(9, 3),
DecimalType(18, 2),
]
@pytest.mark.parametrize("from_type", disallowed_promotions_types, ids=str)
@pytest.mark.parametrize("to_type", disallowed_promotions_types, ids=str)
@pytest.mark.integration
def test_disallowed_updates(from_type: PrimitiveType, to_type: PrimitiveType, catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="bar", field_type=from_type, required=True),
),
)
if from_type != to_type and (from_type, to_type) not in allowed_promotions:
with pytest.raises(ValidationError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.update_column("bar", to_type)
assert str(exc_info.value).startswith("Cannot change column type: bar:")
else:
with tbl.update_schema() as schema_update:
schema_update.update_column("bar", to_type)
assert tbl.schema() == Schema(
NestedField(field_id=1, name="bar", field_type=to_type, required=True),
)
@pytest.mark.integration
def test_rename_simple(simple_table: Table) -> None:
with simple_table.update_schema() as schema_update:
schema_update.rename_column("foo", "vo")
assert simple_table.schema() == Schema(
NestedField(field_id=1, name="vo", field_type=StringType(), required=False),
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
identifier_field_ids=[2],
)
@pytest.mark.integration
def test_rename_simple_nested(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="foo",
field_type=StructType(NestedField(field_id=2, name="bar", field_type=StringType())),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.rename_column("foo.bar", "vo")
assert tbl.schema() == Schema(
NestedField(
field_id=1,
name="foo",
field_type=StructType(NestedField(field_id=2, name="vo", field_type=StringType())),
required=True,
),
)
@pytest.mark.integration
def test_rename_simple_nested_with_dots(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="a.b",
field_type=StructType(NestedField(field_id=2, name="c.d", field_type=StringType())),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.rename_column(("a.b", "c.d"), "e.f")
assert tbl.schema() == Schema(
NestedField(
field_id=1,
name="a.b",
field_type=StructType(NestedField(field_id=2, name="e.f", field_type=StringType())),
required=True,
),
)
@pytest.mark.integration
def test_rename(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="location_lookup",
field_type=MapType(
type="map",
key_id=5,
key_type=StringType(),
value_id=6,
value_type=StructType(
NestedField(field_id=7, name="x", field_type=FloatType(), required=False),
NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
),
value_required=True,
),
required=True,
),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
type="list",
element_id=9,
element_type=StructType(
NestedField(field_id=10, name="x", field_type=FloatType(), required=False),
NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=3,
name="person",
field_type=StructType(
NestedField(field_id=12, name="name", field_type=StringType(), required=False),
NestedField(field_id=13, name="leeftijd", field_type=IntegerType(), required=True),
),
required=False,
),
NestedField(field_id=4, name="foo", field_type=StringType(), required=True),
identifier_field_ids=[],
),
)
with tbl.update_schema() as schema_update:
schema_update.rename_column("foo", "bar")
schema_update.rename_column("location_lookup.x", "latitude")
schema_update.rename_column("locations.x", "latitude")
schema_update.rename_column("person.leeftijd", "age")
assert tbl.schema() == Schema(
NestedField(
field_id=1,
name="location_lookup",
field_type=MapType(
type="map",
key_id=5,
key_type=StringType(),
value_id=6,
value_type=StructType(
NestedField(field_id=7, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
),
value_required=True,
),
required=True,
),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
type="list",
element_id=9,
element_type=StructType(
NestedField(field_id=10, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=3,
name="person",
field_type=StructType(
NestedField(field_id=12, name="name", field_type=StringType(), required=False),
NestedField(field_id=13, name="age", field_type=IntegerType(), required=True),
),
required=False,
),
NestedField(field_id=4, name="bar", field_type=StringType(), required=True),
identifier_field_ids=[],
)
@pytest.mark.integration
def test_rename_case_insensitive(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="location_lookup",
field_type=MapType(
type="map",
key_id=5,
key_type=StringType(),
value_id=6,
value_type=StructType(
NestedField(field_id=7, name="x", field_type=FloatType(), required=False),
NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
),
value_required=True,
),
required=True,
),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
type="list",
element_id=9,
element_type=StructType(
NestedField(field_id=10, name="x", field_type=FloatType(), required=False),
NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=3,
name="person",
field_type=StructType(
NestedField(field_id=12, name="name", field_type=StringType(), required=False),
NestedField(field_id=13, name="leeftijd", field_type=IntegerType(), required=True),
),
required=True,
),
NestedField(field_id=4, name="foo", field_type=StringType(), required=True),
identifier_field_ids=[13],
),
)
with tbl.update_schema(case_sensitive=False) as schema_update:
schema_update.rename_column("Foo", "bar")
schema_update.rename_column("Location_lookup.X", "latitude")
schema_update.rename_column("Locations.X", "latitude")
schema_update.rename_column("Person.Leeftijd", "age")
assert tbl.schema() == Schema(
NestedField(
field_id=1,
name="location_lookup",
field_type=MapType(
type="map",
key_id=5,
key_type=StringType(),
value_id=6,
value_type=StructType(
NestedField(field_id=7, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=8, name="y", field_type=FloatType(), required=False),
),
value_required=True,
),
required=True,
),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
type="list",
element_id=9,
element_type=StructType(
NestedField(field_id=10, name="latitude", field_type=FloatType(), required=False),
NestedField(field_id=11, name="y", field_type=FloatType(), required=False),
),
element_required=True,
),
required=True,
),
NestedField(
field_id=3,
name="person",
field_type=StructType(
NestedField(field_id=12, name="name", field_type=StringType(), required=False),
NestedField(field_id=13, name="age", field_type=IntegerType(), required=True),
),
required=True,
),
NestedField(field_id=4, name="bar", field_type=StringType(), required=True),
identifier_field_ids=[13],
)
@pytest.mark.integration
def test_add_struct(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
struct = StructType(
NestedField(field_id=3, name="x", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="y", field_type=DoubleType(), required=False),
)
with tbl.update_schema() as schema_update:
schema_update.add_column("location", struct)
assert tbl.schema() == Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
NestedField(field_id=2, name="location", field_type=struct, required=False),
)
@pytest.mark.integration
def test_add_nested_map_of_structs(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
map_type_example = MapType(
key_id=1,
value_id=2,
key_type=StructType(
NestedField(field_id=20, name="address", field_type=StringType(), required=True),
NestedField(field_id=21, name="city", field_type=StringType(), required=True),
NestedField(field_id=22, name="state", field_type=StringType(), required=True),
NestedField(field_id=23, name="zip", field_type=IntegerType(), required=True),
),
value_type=StructType(
NestedField(field_id=9, name="lat", field_type=DoubleType(), required=True),
NestedField(field_id=8, name="long", field_type=DoubleType(), required=False),
),
)
with tbl.update_schema() as schema_update:
schema_update.add_column("locations", map_type_example)
assert tbl.schema() == Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
NestedField(
field_id=2,
name="locations",
field_type=MapType(
type="map",
key_id=3,
key_type=StructType(
NestedField(field_id=5, name="address", field_type=StringType(), required=True),
NestedField(field_id=6, name="city", field_type=StringType(), required=True),
NestedField(field_id=7, name="state", field_type=StringType(), required=True),
NestedField(field_id=8, name="zip", field_type=IntegerType(), required=True),
),
value_id=4,
value_type=StructType(
NestedField(field_id=9, name="lat", field_type=DoubleType(), required=True),
NestedField(field_id=10, name="long", field_type=DoubleType(), required=False),
),
value_required=True,
),
required=False,
),
)
@pytest.mark.integration
def test_add_nested_list_of_structs(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
list_type_examples = ListType(
element_id=1,
element_type=StructType(
NestedField(field_id=9, name="lat", field_type=DoubleType(), required=True),
NestedField(field_id=10, name="long", field_type=DoubleType(), required=False),
),
element_required=False,
)
with tbl.update_schema() as schema_update:
schema_update.add_column("locations", list_type_examples)
assert tbl.schema() == Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
NestedField(
field_id=2,
name="locations",
field_type=ListType(
type="list",
element_id=3,
element_type=StructType(
NestedField(field_id=4, name="lat", field_type=DoubleType(), required=True),
NestedField(field_id=5, name="long", field_type=DoubleType(), required=False),
),
element_required=False,
),
required=False,
),
)
@pytest.mark.integration
def test_add_required_column(catalog: Catalog) -> None:
schema_ = Schema(NestedField(field_id=1, name="a", field_type=BooleanType(), required=False))
table = _create_table_with_schema(catalog, schema_)
update = UpdateSchema(table)
with pytest.raises(ValueError) as exc_info:
update.add_column(path="data", field_type=IntegerType(), required=True)
assert "Incompatible change: cannot add required column: data" in str(exc_info.value)
new_schema = (
UpdateSchema(table, allow_incompatible_changes=True) # pylint: disable=W0212
.add_column(path="data", field_type=IntegerType(), required=True)
._apply()
)
assert new_schema == Schema(
NestedField(field_id=1, name="a", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="data", field_type=IntegerType(), required=True),
)
@pytest.mark.integration
def test_add_required_column_case_insensitive(catalog: Catalog) -> None:
schema_ = Schema(NestedField(field_id=1, name="id", field_type=BooleanType(), required=False))
table = _create_table_with_schema(catalog, schema_)
with pytest.raises(ValueError) as exc_info:
with UpdateSchema(table, allow_incompatible_changes=True) as update:
update.case_sensitive(False).add_column(path="ID", field_type=IntegerType(), required=True)
assert "already exists: ID" in str(exc_info.value)
new_schema = (
UpdateSchema(table, allow_incompatible_changes=True) # pylint: disable=W0212
.add_column(path="ID", field_type=IntegerType(), required=True)
._apply()
)
assert new_schema == Schema(
NestedField(field_id=1, name="id", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="ID", field_type=IntegerType(), required=True),
)
@pytest.mark.integration
def test_make_column_optional(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.make_column_optional("foo")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
)
@pytest.mark.integration
def test_mixed_changes(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=StringType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="preferences",
field_type=StructType(
NestedField(field_id=8, name="feature1", type=BooleanType(), required=True),
NestedField(field_id=9, name="feature2", type=BooleanType(), required=False),
),
required=False,
),
NestedField(
field_id=4,
name="locations",
field_type=MapType(
key_id=10,
value_id=11,
key_type=StructType(
NestedField(field_id=20, name="address", field_type=StringType(), required=True),
NestedField(field_id=21, name="city", field_type=StringType(), required=True),
NestedField(field_id=22, name="state", field_type=StringType(), required=True),
NestedField(field_id=23, name="zip", field_type=IntegerType(), required=True),
),
value_type=StructType(
NestedField(field_id=12, name="lat", field_type=DoubleType(), required=True),
NestedField(field_id=13, name="long", field_type=DoubleType(), required=False),
),
),
required=True,
),
NestedField(
field_id=5,
name="points",
field_type=ListType(
element_id=14,
element_type=StructType(
NestedField(field_id=15, name="x", field_type=LongType(), required=True),
NestedField(field_id=16, name="y", field_type=LongType(), required=True),
),
),
required=True,
doc="2-D cartesian points",
),
NestedField(field_id=6, name="doubles", field_type=ListType(element_id=17, element_type=DoubleType()), required=True),
NestedField(
field_id=7,
name="properties",
field_type=MapType(key_id=18, value_id=19, key_type=StringType(), value_type=StringType()),
required=False,
),
),
)
with tbl.update_schema(allow_incompatible_changes=True) as schema_update:
schema_update.add_column("toplevel", field_type=DecimalType(9, 2))
schema_update.add_column(("locations", "alt"), field_type=FloatType())
schema_update.add_column(("points", "z"), field_type=LongType())
schema_update.add_column(("points", "t.t"), field_type=LongType(), doc="name with '.'")
schema_update.rename_column("data", "json")
schema_update.rename_column("preferences", "options")
schema_update.rename_column("preferences.feature2", "newfeature")
schema_update.rename_column("locations.lat", "latitude")
schema_update.rename_column("points.x", "X")
schema_update.rename_column("points.y", "y.y")
schema_update.update_column("id", field_type=LongType(), doc="unique id")
schema_update.update_column("locations.lat", DoubleType())
schema_update.update_column("locations.lat", doc="latitude")
schema_update.delete_column("locations.long")
schema_update.delete_column("properties")
schema_update.make_column_optional("points.x")
schema_update.update_column("data", required=True)
schema_update.add_column(("locations", "description"), StringType(), doc="location description")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True, doc="unique id"),
NestedField(field_id=2, name="json", field_type=StringType(), required=True),
NestedField(
field_id=3,
name="options",
field_type=StructType(
NestedField(field_id=8, name="feature1", field_type=BooleanType(), required=True),
NestedField(field_id=9, name="newfeature", field_type=BooleanType(), required=False),
),
required=False,
),
NestedField(
field_id=4,
name="locations",
field_type=MapType(
type="map",
key_id=10,
key_type=StructType(
NestedField(field_id=12, name="address", field_type=StringType(), required=True),
NestedField(field_id=13, name="city", field_type=StringType(), required=True),
NestedField(field_id=14, name="state", field_type=StringType(), required=True),
NestedField(field_id=15, name="zip", field_type=IntegerType(), required=True),
),
value_id=11,
value_type=StructType(
NestedField(field_id=16, name="latitude", field_type=DoubleType(), required=True, doc="latitude"),
NestedField(field_id=25, name="alt", field_type=FloatType(), required=False),
NestedField(
field_id=28, name="description", field_type=StringType(), required=False, doc="location description"
),
),
value_required=True,
),
required=True,
),
NestedField(
field_id=5,
name="points",
field_type=ListType(
type="list",
element_id=18,
element_type=StructType(
NestedField(field_id=19, name="X", field_type=LongType(), required=False),
NestedField(field_id=20, name="y.y", field_type=LongType(), required=True),
NestedField(field_id=26, name="z", field_type=LongType(), required=False),
NestedField(field_id=27, name="t.t", field_type=LongType(), required=False, doc="name with '.'"),
),
element_required=True,
),
doc="2-D cartesian points",
required=True,
),
NestedField(
field_id=6,
name="doubles",
field_type=ListType(type="list", element_id=21, element_type=DoubleType(), element_required=True),
required=True,
),
NestedField(field_id=24, name="toplevel", field_type=DecimalType(precision=9, scale=2), required=False),
)
@pytest.mark.integration
def test_ambiguous_column(catalog: Catalog, table_schema_nested: Schema) -> None:
table = _create_table_with_schema(catalog, table_schema_nested)
update = UpdateSchema(table)
with pytest.raises(ValueError) as exc_info:
update.add_column(path="location.latitude", field_type=IntegerType())
assert "Cannot add column with ambiguous name: location.latitude, provide a tuple instead" in str(exc_info.value)
@pytest.mark.integration
def test_delete_then_add(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.delete_column("foo")
schema_update.add_column("foo", StringType())
assert tbl.schema() == Schema(
NestedField(field_id=2, name="foo", field_type=StringType(), required=False),
)
@pytest.mark.integration
def test_delete_then_add_nested(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="preferences",
field_type=StructType(
NestedField(field_id=2, name="feature1", field_type=BooleanType()),
NestedField(field_id=3, name="feature2", field_type=BooleanType()),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.delete_column("preferences.feature1")
schema_update.add_column(("preferences", "feature1"), BooleanType())
assert tbl.schema() == Schema(
NestedField(
field_id=1,
name="preferences",
field_type=StructType(
NestedField(field_id=3, name="feature2", field_type=BooleanType()),
NestedField(field_id=4, name="feature1", field_type=BooleanType(), required=False),
),
required=True,
),
)
@pytest.mark.integration
def test_delete_missing_column(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.delete_column("bar")
assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)
@pytest.mark.integration
def test_add_delete_conflict(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.add_column("bar", BooleanType())
schema_update.delete_column("bar")
assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="preferences",
field_type=StructType(
NestedField(field_id=2, name="feature1", field_type=BooleanType()),
NestedField(field_id=3, name="feature2", field_type=BooleanType()),
),
required=True,
),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.add_column(("preferences", "feature3"), BooleanType())
schema_update.delete_column("preferences")
assert "Cannot delete a column that has additions: preferences" in str(exc_info.value)
@pytest.mark.integration
def test_rename_missing_column(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.rename_column("bar", "fail")
assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)
@pytest.mark.integration
def test_rename_missing_conflicts(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.rename_column("foo", "bar")
schema_update.delete_column("foo")
assert "Cannot delete a column that has updates: foo" in str(exc_info.value)
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.rename_column("foo", "bar")
schema_update.delete_column("bar")
assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)
@pytest.mark.integration
def test_update_missing_column(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.update_column("bar", DateType())
assert "Could not find field with name bar, case_sensitive=True" in str(exc_info.value)
@pytest.mark.integration
def test_update_delete_conflict(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=IntegerType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.update_column("foo", LongType())
schema_update.delete_column("foo")
assert "Cannot delete a column that has updates: foo" in str(exc_info.value)
@pytest.mark.integration
def test_delete_update_conflict(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=IntegerType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.delete_column("foo")
schema_update.update_column("foo", LongType())
assert "Cannot update a column that will be deleted: foo" in str(exc_info.value)
@pytest.mark.integration
def test_delete_map_key(nested_table: Table) -> None:
with pytest.raises(ValueError) as exc_info:
with nested_table.update_schema() as schema_update:
schema_update.delete_column("quux.key")
assert "Cannot delete map keys" in str(exc_info.value)
@pytest.mark.integration
def test_add_field_to_map_key(nested_table: Table) -> None:
with pytest.raises(ValueError) as exc_info:
with nested_table.update_schema() as schema_update:
schema_update.add_column(("quux", "key"), StringType())
assert "Cannot add column 'key' to non-struct type: quux" in str(exc_info.value)
@pytest.mark.integration
def test_alter_map_key(nested_table: Table) -> None:
with pytest.raises(ValueError) as exc_info:
with nested_table.update_schema() as schema_update:
schema_update.update_column(("quux", "key"), BinaryType())
assert "Cannot update map keys" in str(exc_info.value)
@pytest.mark.integration
def test_update_map_key(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1, name="m", field_type=MapType(key_id=2, value_id=3, key_type=IntegerType(), value_type=DoubleType())
)
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.update_column("m.key", LongType())
assert "Cannot update map keys: map<int, double>" in str(exc_info.value)
@pytest.mark.integration
def test_update_added_column_doc(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.add_column("value", LongType())
schema_update.update_column("value", doc="a value")
assert "Could not find field with name value, case_sensitive=True" in str(exc_info.value)
@pytest.mark.integration
def test_update_deleted_column_doc(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.delete_column("foo")
schema_update.update_column("foo", doc="a value")
assert "Cannot update a column that will be deleted: foo" in str(exc_info.value)
@pytest.mark.integration
def test_multiple_moves(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="a", field_type=IntegerType(), required=True),
NestedField(field_id=2, name="b", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="c", field_type=IntegerType(), required=True),
NestedField(field_id=4, name="d", field_type=IntegerType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_first("d")
schema_update.move_first("c")
schema_update.move_after("b", "d")
schema_update.move_before("d", "a")
assert tbl.schema() == Schema(
NestedField(field_id=3, name="c", field_type=IntegerType(), required=True),
NestedField(field_id=2, name="b", field_type=IntegerType(), required=True),
NestedField(field_id=4, name="d", field_type=IntegerType(), required=True),
NestedField(field_id=1, name="a", field_type=IntegerType(), required=True),
)
@pytest.mark.integration
def test_move_top_level_column_first(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_first("data")
assert tbl.schema() == Schema(
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
)
@pytest.mark.integration
def test_move_top_level_column_before_first(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_before("data", "id")
assert tbl.schema() == Schema(
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
)
@pytest.mark.integration
def test_move_top_level_column_after_last(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_after("id", "data")
assert tbl.schema() == Schema(
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
)
@pytest.mark.integration
def test_move_nested_field_first(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_first("struct.data")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
),
required=True,
),
)
@pytest.mark.integration
def test_move_nested_field_before_first(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_before("struct.data", "struct.count")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
),
required=True,
),
)
@pytest.mark.integration
def test_move_nested_field_after_first(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_before("struct.data", "struct.count")
assert str(tbl.schema()) == str(
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
),
required=True,
),
)
)
@pytest.mark.integration
def test_move_nested_field_after(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_after("struct.ts", "struct.count")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
)
@pytest.mark.integration
def test_move_nested_field_before(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_before("struct.ts", "struct.data")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
)
@pytest.mark.integration
def test_move_map_value_struct_field(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="map",
field_type=MapType(
key_id=3,
value_id=4,
key_type=StringType(),
value_type=StructType(
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
NestedField(field_id=6, name="count", field_type=LongType(), required=True),
NestedField(field_id=7, name="data", field_type=StringType(), required=True),
),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.move_before("map.ts", "map.data")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="map",
field_type=MapType(
key_id=3,
value_id=4,
key_type=StringType(),
value_type=StructType(
NestedField(field_id=6, name="count", field_type=LongType(), required=True),
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=True),
NestedField(field_id=7, name="data", field_type=StringType(), required=True),
),
),
required=True,
),
)
@pytest.mark.integration
def test_move_added_top_level_column(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.add_column("ts", TimestamptzType())
schema_update.move_after("ts", "id")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=3, name="ts", field_type=TimestamptzType(), required=False),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
)
@pytest.mark.integration
def test_move_added_top_level_column_after_added_column(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
),
)
with tbl.update_schema() as schema_update:
schema_update.add_column("ts", TimestamptzType())
schema_update.add_column("count", LongType())
schema_update.move_after("ts", "id")
schema_update.move_after("count", "ts")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=3, name="ts", field_type=TimestamptzType(), required=False),
NestedField(field_id=4, name="count", field_type=LongType(), required=False),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
)
@pytest.mark.integration
def test_move_added_nested_struct_field(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.add_column(("struct", "ts"), TimestamptzType())
schema_update.move_before("struct.ts", "struct.count")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=False),
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
)
@pytest.mark.integration
def test_move_added_nested_field_before_added_column(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
),
)
with tbl.update_schema() as schema_update:
schema_update.add_column(("struct", "ts"), TimestamptzType())
schema_update.add_column(("struct", "size"), LongType())
schema_update.move_before("struct.ts", "struct.count")
schema_update.move_before("struct.size", "struct.ts")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(
field_id=2,
name="struct",
field_type=StructType(
NestedField(field_id=6, name="size", field_type=LongType(), required=False),
NestedField(field_id=5, name="ts", field_type=TimestamptzType(), required=False),
NestedField(field_id=3, name="count", field_type=LongType(), required=True),
NestedField(field_id=4, name="data", field_type=StringType(), required=True),
),
required=True,
),
)
@pytest.mark.integration
def test_move_self_reference_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("foo", "foo")
assert "Cannot move foo before itself" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_after("foo", "foo")
assert "Cannot move foo after itself" in str(exc_info.value)
@pytest.mark.integration
def test_move_missing_column_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_first("items")
assert "Cannot move missing column: items" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("items", "id")
assert "Cannot move missing column: items" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_after("items", "data")
assert "Cannot move missing column: items" in str(exc_info.value)
@pytest.mark.integration
def test_move_before_add_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_first("ts")
update.add_column("ts", TimestamptzType())
assert "Cannot move missing column: ts" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("ts", "id")
update.add_column("ts", TimestamptzType())
assert "Cannot move missing column: ts" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_after("ts", "data")
update.add_column("ts", TimestamptzType())
assert "Cannot move missing column: ts" in str(exc_info.value)
@pytest.mark.integration
def test_move_missing_reference_column_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("id", "items")
assert "Cannot move id before missing column: items" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_after("data", "items")
assert "Cannot move data after missing column: items" in str(exc_info.value)
@pytest.mark.integration
def test_move_primitive_map_key_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
NestedField(
field_id=3,
name="map",
field_type=MapType(key_id=4, value_id=5, key_type=StringType(), value_type=StringType()),
required=False,
),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("map.key", "map.value")
assert "Cannot move fields in non-struct type: map<string, string>" in str(exc_info.value)
@pytest.mark.integration
def test_move_primitive_map_value_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
NestedField(
field_id=3,
name="map",
field_type=MapType(key_id=4, value_id=5, key_type=StringType(), value_type=StructType()),
required=False,
),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("map.value", "map.key")
assert "Cannot move fields in non-struct type: map<string, struct<>>" in str(exc_info.value)
@pytest.mark.integration
def test_move_top_level_between_structs_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="a", field_type=IntegerType(), required=True),
NestedField(field_id=2, name="b", field_type=IntegerType(), required=True),
NestedField(
field_id=3,
name="struct",
field_type=StructType(
NestedField(field_id=4, name="x", field_type=IntegerType(), required=True),
NestedField(field_id=5, name="y", field_type=IntegerType(), required=True),
),
required=False,
),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("a", "struct.x")
assert "Cannot move field a to a different struct" in str(exc_info.value)
@pytest.mark.integration
def test_move_between_structs_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(
field_id=1,
name="s1",
field_type=StructType(
NestedField(field_id=3, name="a", field_type=IntegerType(), required=True),
NestedField(field_id=4, name="b", field_type=IntegerType(), required=True),
),
required=False,
),
NestedField(
field_id=2,
name="s2",
field_type=StructType(
NestedField(field_id=5, name="x", field_type=IntegerType(), required=True),
NestedField(field_id=6, name="y", field_type=IntegerType(), required=True),
),
required=False,
),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update:
update.move_before("s2.x", "s1.a")
assert "Cannot move field s2.x to a different struct" in str(exc_info.value)
@pytest.mark.integration
def test_add_existing_identifier_fields(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema() as update_schema:
update_schema.set_identifier_fields("foo")
assert tbl.schema().identifier_field_names() == {"foo"}
@pytest.mark.integration
def test_add_new_identifiers_field_columns(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column("new_field", StringType(), required=True)
update_schema.set_identifier_fields("foo", "new_field")
assert tbl.schema().identifier_field_names() == {"foo", "new_field"}
@pytest.mark.integration
def test_add_new_identifiers_field_columns_out_of_order(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column("new_field", StringType(), required=True)
update_schema.set_identifier_fields("foo", "new_field")
assert tbl.schema().identifier_field_names() == {"foo", "new_field"}
@pytest.mark.integration
def test_add_nested_identifier_field_columns(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column(
"required_struct", StructType(NestedField(field_id=3, name="field", type=StringType(), required=True)), required=True
)
with tbl.update_schema() as update_schema:
update_schema.set_identifier_fields("required_struct.field")
assert tbl.schema().identifier_field_names() == {"required_struct.field"}
@pytest.mark.integration
def test_add_nested_identifier_field_columns_single_transaction(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column(
"new", StructType(NestedField(field_id=3, name="field", type=StringType(), required=True)), required=True
)
update_schema.set_identifier_fields("new.field")
assert tbl.schema().identifier_field_names() == {"new.field"}
@pytest.mark.integration
def test_add_nested_nested_identifier_field_columns(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column(
"new",
StructType(
NestedField(
field_id=3,
name="field",
type=StructType(NestedField(field_id=4, name="nested", type=StringType(), required=True)),
required=True,
)
),
required=True,
)
update_schema.set_identifier_fields("new.field.nested")
assert tbl.schema().identifier_field_names() == {"new.field.nested"}
@pytest.mark.integration
def test_add_dotted_identifier_field_columns(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column(("dot.field",), StringType(), required=True)
update_schema.set_identifier_fields("dot.field")
assert tbl.schema().identifier_field_names() == {"dot.field"}
@pytest.mark.integration
def test_remove_identifier_fields(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.add_column(("new_field",), StringType(), required=True)
update_schema.add_column(("new_field2",), StringType(), required=True)
update_schema.set_identifier_fields("foo", "new_field", "new_field2")
assert tbl.schema().identifier_field_names() == {"foo", "new_field", "new_field2"}
with tbl.update_schema(allow_incompatible_changes=True) as update_schema:
update_schema.set_identifier_fields()
assert tbl.schema().identifier_field_names() == set()
@pytest.mark.integration
def test_set_identifier_field_fails_schema(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
NestedField(field_id=2, name="float", field_type=FloatType(), required=True),
NestedField(field_id=3, name="double", field_type=DoubleType(), required=True),
identifier_field_ids=[],
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update_schema:
update_schema.set_identifier_fields("id")
assert "Identifier field 1 invalid: not a required field" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update_schema:
update_schema.set_identifier_fields("float")
assert "Identifier field 2 invalid: must not be float or double field" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update_schema:
update_schema.set_identifier_fields("double")
assert "Identifier field 3 invalid: must not be float or double field" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as update_schema:
update_schema.set_identifier_fields("unknown")
assert "Cannot find identifier field unknown. In case of deletion, update the identifier fields first." in str(exc_info.value)
@pytest.mark.integration
def test_set_identifier_field_fails(nested_table: Table) -> None:
with pytest.raises(ValueError) as exc_info:
with nested_table.update_schema() as update_schema:
update_schema.set_identifier_fields("location")
assert "Identifier field 6 invalid: not a primitive type field" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with nested_table.update_schema() as update_schema:
update_schema.set_identifier_fields("baz")
assert "Identifier field 3 invalid: not a required field" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
with nested_table.update_schema() as update_schema:
update_schema.set_identifier_fields("person.name")
assert "Identifier field 16 invalid: not a required field" in str(exc_info.value)
@pytest.mark.integration
def test_delete_identifier_field_columns(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema() as schema_update:
schema_update.delete_column("foo")
schema_update.set_identifier_fields()
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema() as schema_update:
schema_update.set_identifier_fields()
schema_update.delete_column("foo")
@pytest.mark.integration
def test_delete_containing_nested_identifier_field_columns_fails(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema(allow_incompatible_changes=True) as schema_update:
schema_update.add_column(
"out", StructType(NestedField(field_id=3, name="nested", field_type=StringType(), required=True)), required=True
)
schema_update.set_identifier_fields("out.nested")
assert tbl.schema() == Schema(
NestedField(field_id=1, name="foo", field_type=StringType(), required=True),
NestedField(
field_id=2,
name="out",
field_type=StructType(NestedField(field_id=3, name="nested", field_type=StringType(), required=True)),
required=True,
),
identifier_field_ids=[3],
)
with pytest.raises(ValueError) as exc_info:
with tbl.update_schema() as schema_update:
schema_update.delete_column("out")
assert "Cannot find identifier field out.nested. In case of deletion, update the identifier fields first." in str(exc_info)
@pytest.mark.integration
def test_rename_identifier_fields(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(NestedField(field_id=1, name="foo", field_type=StringType(), required=True), identifier_field_ids=[1]),
)
with tbl.update_schema() as schema_update:
schema_update.rename_column("foo", "bar")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"bar"}
@pytest.mark.integration
def test_move_identifier_fields(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
identifier_field_ids=[1],
),
)
with tbl.update_schema() as update:
update.move_before("data", "id")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"id"}
with tbl.update_schema() as update:
update.move_after("id", "data")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"id"}
with tbl.update_schema() as update:
update.move_first("data")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"id"}
@pytest.mark.integration
def test_move_identifier_fields_case_insensitive(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
NestedField(field_id=2, name="data", field_type=StringType(), required=True),
identifier_field_ids=[1],
),
)
with tbl.update_schema(case_sensitive=False) as update:
update.move_before("DATA", "ID")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"id"}
with tbl.update_schema(case_sensitive=False) as update:
update.move_after("ID", "DATA")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"id"}
with tbl.update_schema(case_sensitive=False) as update:
update.move_first("DATA")
assert tbl.schema().identifier_field_ids == [1]
assert tbl.schema().identifier_field_names() == {"id"}
@pytest.mark.integration
def test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None:
tbl = _create_table_with_schema(
catalog,
Schema(
NestedField(field_id=1, name="foo", field_type=StringType()),
),
)
with pytest.raises(ValueError) as exc_info:
with tbl.transaction() as tr:
with tr.update_schema() as update:
update.add_column("bar", field_type=StringType())
with tr.update_schema() as update:
update.add_column("baz", field_type=StringType())
assert "Updates in a single commit need to be unique, duplicate: <class 'pyiceberg.table.AddSchemaUpdate'>" in str(
exc_info.value
)