| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint:disable=redefined-outer-name |
| import json |
| import uuid |
| from copy import copy |
| from typing import Any, Dict |
| |
| import pytest |
| from pydantic import ValidationError |
| from sortedcontainers import SortedList |
| |
| from pyiceberg.catalog.noop import NoopCatalog |
| from pyiceberg.exceptions import CommitFailedException |
| from pyiceberg.expressions import ( |
| AlwaysTrue, |
| And, |
| EqualTo, |
| In, |
| ) |
| from pyiceberg.io import PY_IO_IMPL, load_file_io |
| from pyiceberg.manifest import ( |
| DataFile, |
| DataFileContent, |
| FileFormat, |
| ManifestEntry, |
| ManifestEntryStatus, |
| ) |
| from pyiceberg.partitioning import PartitionField, PartitionSpec |
| from pyiceberg.schema import Schema |
| from pyiceberg.table import ( |
| CommitTableRequest, |
| StaticTable, |
| Table, |
| TableIdentifier, |
| _match_deletes_to_data_file, |
| ) |
| from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id |
| from pyiceberg.table.refs import SnapshotRef |
| from pyiceberg.table.snapshots import ( |
| MetadataLogEntry, |
| Operation, |
| Snapshot, |
| SnapshotLogEntry, |
| Summary, |
| ancestors_of, |
| ) |
| from pyiceberg.table.sorting import ( |
| NullOrder, |
| SortDirection, |
| SortField, |
| SortOrder, |
| ) |
| from pyiceberg.table.statistics import BlobMetadata, StatisticsFile |
| from pyiceberg.table.update import ( |
| AddSnapshotUpdate, |
| AddSortOrderUpdate, |
| AssertCreate, |
| AssertCurrentSchemaId, |
| AssertDefaultSortOrderId, |
| AssertDefaultSpecId, |
| AssertLastAssignedFieldId, |
| AssertLastAssignedPartitionId, |
| AssertRefSnapshotId, |
| AssertTableUUID, |
| RemovePropertiesUpdate, |
| RemoveSnapshotRefUpdate, |
| RemoveSnapshotsUpdate, |
| RemoveStatisticsUpdate, |
| SetDefaultSortOrderUpdate, |
| SetPropertiesUpdate, |
| SetSnapshotRefUpdate, |
| SetStatisticsUpdate, |
| _apply_table_update, |
| _TableMetadataUpdateContext, |
| update_table_metadata, |
| ) |
| from pyiceberg.table.update.schema import UpdateSchema |
| from pyiceberg.transforms import ( |
| BucketTransform, |
| IdentityTransform, |
| ) |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DecimalType, |
| DoubleType, |
| FixedType, |
| FloatType, |
| IntegerType, |
| ListType, |
| LongType, |
| MapType, |
| NestedField, |
| PrimitiveType, |
| StringType, |
| StructType, |
| TimestampType, |
| TimestamptzType, |
| TimeType, |
| UUIDType, |
| ) |
| |
| |
| def test_schema(table_v2: Table) -> None: |
| assert table_v2.schema() == Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| NestedField(field_id=3, name="z", field_type=LongType(), required=True), |
| identifier_field_ids=[1, 2], |
| ) |
| assert table_v2.schema().schema_id == 1 |
| |
| |
| def test_schemas(table_v2: Table) -> None: |
| assert table_v2.schemas() == { |
| 0: Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| identifier_field_ids=[], |
| ), |
| 1: Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| NestedField(field_id=3, name="z", field_type=LongType(), required=True), |
| identifier_field_ids=[1, 2], |
| ), |
| } |
| assert table_v2.schemas()[0].schema_id == 0 |
| assert table_v2.schemas()[1].schema_id == 1 |
| |
| |
| def test_spec(table_v2: Table) -> None: |
| assert table_v2.spec() == PartitionSpec( |
| PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="x"), spec_id=0 |
| ) |
| |
| |
| def test_specs(table_v2: Table) -> None: |
| assert table_v2.specs() == { |
| 0: PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="x"), spec_id=0) |
| } |
| |
| |
| def test_sort_order(table_v2: Table) -> None: |
| assert table_v2.sort_order() == SortOrder( |
| SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), |
| SortField( |
| source_id=3, |
| transform=BucketTransform(num_buckets=4), |
| direction=SortDirection.DESC, |
| null_order=NullOrder.NULLS_LAST, |
| ), |
| order_id=3, |
| ) |
| |
| |
| def test_sort_orders(table_v2: Table) -> None: |
| assert table_v2.sort_orders() == { |
| 3: SortOrder( |
| SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), |
| SortField( |
| source_id=3, |
| transform=BucketTransform(num_buckets=4), |
| direction=SortDirection.DESC, |
| null_order=NullOrder.NULLS_LAST, |
| ), |
| order_id=3, |
| ) |
| } |
| |
| |
| def test_location(table_v2: Table) -> None: |
| assert table_v2.location() == "s3://bucket/test/location" |
| |
| |
| def test_current_snapshot(table_v2: Table) -> None: |
| assert table_v2.current_snapshot() == Snapshot( |
| snapshot_id=3055729675574597004, |
| parent_snapshot_id=3051729675574597004, |
| sequence_number=1, |
| timestamp_ms=1555100955770, |
| manifest_list="s3://a/b/2.avro", |
| summary=Summary(operation=Operation.APPEND), |
| schema_id=1, |
| ) |
| |
| |
| def test_snapshot_by_id(table_v2: Table) -> None: |
| assert table_v2.snapshot_by_id(3055729675574597004) == Snapshot( |
| snapshot_id=3055729675574597004, |
| parent_snapshot_id=3051729675574597004, |
| sequence_number=1, |
| timestamp_ms=1555100955770, |
| manifest_list="s3://a/b/2.avro", |
| summary=Summary(operation=Operation.APPEND), |
| schema_id=1, |
| ) |
| |
| |
| def test_snapshot_by_timestamp(table_v2: Table) -> None: |
| assert table_v2.snapshot_as_of_timestamp(1515100955770) == Snapshot( |
| snapshot_id=3051729675574597004, |
| parent_snapshot_id=None, |
| sequence_number=0, |
| timestamp_ms=1515100955770, |
| manifest_list="s3://a/b/1.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=None, |
| ) |
| assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None |
| |
| |
| def test_ancestors_of(table_v2: Table) -> None: |
| assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ |
| Snapshot( |
| snapshot_id=3055729675574597004, |
| parent_snapshot_id=3051729675574597004, |
| sequence_number=1, |
| timestamp_ms=1555100955770, |
| manifest_list="s3://a/b/2.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=1, |
| ), |
| Snapshot( |
| snapshot_id=3051729675574597004, |
| parent_snapshot_id=None, |
| sequence_number=0, |
| timestamp_ms=1515100955770, |
| manifest_list="s3://a/b/1.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=None, |
| ), |
| ] |
| |
| |
| def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: |
| # Test RecursionError: maximum recursion depth exceeded |
| assert ( |
| len( |
| list( |
| ancestors_of( |
| table_v2_with_extensive_snapshots.current_snapshot(), |
| table_v2_with_extensive_snapshots.metadata, |
| ) |
| ) |
| ) |
| == 2000 |
| ) |
| |
| |
| def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None: |
| assert table_v2.snapshot_by_id(-1) is None |
| |
| |
| def test_snapshot_by_name(table_v2: Table) -> None: |
| assert table_v2.snapshot_by_name("test") == Snapshot( |
| snapshot_id=3051729675574597004, |
| parent_snapshot_id=None, |
| sequence_number=0, |
| timestamp_ms=1515100955770, |
| manifest_list="s3://a/b/1.avro", |
| summary=Summary(operation=Operation.APPEND), |
| schema_id=None, |
| ) |
| |
| |
| def test_snapshot_by_name_does_not_exist(table_v2: Table) -> None: |
| assert table_v2.snapshot_by_name("doesnotexist") is None |
| |
| |
| def test_repr(table_v2: Table) -> None: |
| expected = """table( |
| 1: x: required long, |
| 2: y: required long (comment), |
| 3: z: required long |
| ), |
| partition by: [x], |
| sort order: [2 ASC NULLS FIRST, bucket[4](3) DESC NULLS LAST], |
| snapshot: Operation.APPEND: id=3055729675574597004, parent_id=3051729675574597004, schema_id=1""" |
| assert repr(table_v2) == expected |
| |
| |
| def test_history(table_v2: Table) -> None: |
| assert table_v2.history() == [ |
| SnapshotLogEntry(snapshot_id=3051729675574597004, timestamp_ms=1515100955770), |
| SnapshotLogEntry(snapshot_id=3055729675574597004, timestamp_ms=1555100955770), |
| ] |
| |
| |
| def test_table_scan_select(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| assert scan.selected_fields == ("*",) |
| assert scan.select("a", "b").selected_fields == ("a", "b") |
| assert scan.select("a", "c").select("a").selected_fields == ("a",) |
| |
| |
| def test_table_scan_row_filter(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| assert scan.row_filter == AlwaysTrue() |
| assert scan.filter(EqualTo("x", 10)).row_filter == EqualTo("x", 10) |
| assert scan.filter(EqualTo("x", 10)).filter(In("y", (10, 11))).row_filter == And(EqualTo("x", 10), In("y", (10, 11))) |
| |
| |
| def test_table_scan_ref(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| assert scan.use_ref("test").snapshot_id == 3051729675574597004 |
| |
| |
| def test_table_scan_ref_does_not_exists(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| |
| with pytest.raises(ValueError) as exc_info: |
| _ = scan.use_ref("boom") |
| |
| assert "Cannot scan unknown ref=boom" in str(exc_info.value) |
| |
| |
| def test_table_scan_projection_full_schema(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| projection_schema = scan.select("x", "y", "z").projection() |
| assert projection_schema == Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| NestedField(field_id=3, name="z", field_type=LongType(), required=True), |
| identifier_field_ids=[1, 2], |
| ) |
| assert projection_schema.schema_id == 1 |
| |
| |
| def test_table_scan_projection_single_column(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| projection_schema = scan.select("y").projection() |
| assert projection_schema == Schema( |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| identifier_field_ids=[2], |
| ) |
| assert projection_schema.schema_id == 1 |
| |
| |
| def test_table_scan_projection_single_column_case_sensitive(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| projection_schema = scan.with_case_sensitive(False).select("Y").projection() |
| assert projection_schema == Schema( |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| identifier_field_ids=[2], |
| ) |
| assert projection_schema.schema_id == 1 |
| |
| |
| def test_table_scan_projection_unknown_column(table_v2: Table) -> None: |
| scan = table_v2.scan() |
| |
| with pytest.raises(ValueError) as exc_info: |
| _ = scan.select("a").projection() |
| |
| assert "Could not find column: 'a'" in str(exc_info.value) |
| |
| |
| def test_static_table_same_as_table(table_v2: Table, metadata_location: str) -> None: |
| static_table = StaticTable.from_metadata(metadata_location) |
| assert isinstance(static_table, Table) |
| assert static_table.metadata == table_v2.metadata |
| |
| |
| def test_static_table_gz_same_as_table(table_v2: Table, metadata_location_gz: str) -> None: |
| static_table = StaticTable.from_metadata(metadata_location_gz) |
| assert isinstance(static_table, Table) |
| assert static_table.metadata == table_v2.metadata |
| |
| |
| def test_static_table_version_hint_same_as_table(table_v2: Table, table_location: str) -> None: |
| static_table = StaticTable.from_metadata(table_location) |
| assert isinstance(static_table, Table) |
| assert static_table.metadata == table_v2.metadata |
| |
| |
| def test_static_table_io_does_not_exist(metadata_location: str) -> None: |
| with pytest.raises(ValueError): |
| StaticTable.from_metadata(metadata_location, {PY_IO_IMPL: "pyiceberg.does.not.exist.FileIO"}) |
| |
| |
| def test_match_deletes_to_datafile() -> None: |
| data_entry = ManifestEntry.from_args( |
| status=ManifestEntryStatus.ADDED, |
| sequence_number=1, |
| data_file=DataFile.from_args( |
| content=DataFileContent.DATA, |
| file_path="s3://bucket/0000.parquet", |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| ), |
| ) |
| delete_entry_1 = ManifestEntry.from_args( |
| status=ManifestEntryStatus.ADDED, |
| sequence_number=0, # Older than the data |
| data_file=DataFile.from_args( |
| content=DataFileContent.POSITION_DELETES, |
| file_path="s3://bucket/0001-delete.parquet", |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| ), |
| ) |
| delete_entry_2 = ManifestEntry.from_args( |
| status=ManifestEntryStatus.ADDED, |
| sequence_number=3, |
| data_file=DataFile.from_args( |
| content=DataFileContent.POSITION_DELETES, |
| file_path="s3://bucket/0002-delete.parquet", |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| # We don't really care about the tests here |
| value_counts={}, |
| null_value_counts={}, |
| nan_value_counts={}, |
| lower_bounds={}, |
| upper_bounds={}, |
| ), |
| ) |
| assert _match_deletes_to_data_file( |
| data_entry, |
| SortedList(iterable=[delete_entry_1, delete_entry_2], key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER), |
| ) == { |
| delete_entry_2.data_file, |
| } |
| |
| |
| def test_match_deletes_to_datafile_duplicate_number() -> None: |
| data_entry = ManifestEntry.from_args( |
| status=ManifestEntryStatus.ADDED, |
| sequence_number=1, |
| data_file=DataFile.from_args( |
| content=DataFileContent.DATA, |
| file_path="s3://bucket/0000.parquet", |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| ), |
| ) |
| delete_entry_1 = ManifestEntry.from_args( |
| status=ManifestEntryStatus.ADDED, |
| sequence_number=3, |
| data_file=DataFile.from_args( |
| content=DataFileContent.POSITION_DELETES, |
| file_path="s3://bucket/0001-delete.parquet", |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| # We don't really care about the tests here |
| value_counts={}, |
| null_value_counts={}, |
| nan_value_counts={}, |
| lower_bounds={}, |
| upper_bounds={}, |
| ), |
| ) |
| delete_entry_2 = ManifestEntry.from_args( |
| status=ManifestEntryStatus.ADDED, |
| sequence_number=3, |
| data_file=DataFile.from_args( |
| content=DataFileContent.POSITION_DELETES, |
| file_path="s3://bucket/0002-delete.parquet", |
| file_format=FileFormat.PARQUET, |
| partition={}, |
| record_count=3, |
| file_size_in_bytes=3, |
| # We don't really care about the tests here |
| value_counts={}, |
| null_value_counts={}, |
| nan_value_counts={}, |
| lower_bounds={}, |
| upper_bounds={}, |
| ), |
| ) |
| assert _match_deletes_to_data_file( |
| data_entry, |
| SortedList(iterable=[delete_entry_1, delete_entry_2], key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER), |
| ) == { |
| delete_entry_1.data_file, |
| delete_entry_2.data_file, |
| } |
| |
| |
| def test_serialize_set_properties_updates() -> None: |
| assert ( |
| SetPropertiesUpdate(updates={"abc": "🤪"}).model_dump_json() == """{"action":"set-properties","updates":{"abc":"🤪"}}""" |
| ) |
| |
| |
| def test_add_column(table_v2: Table) -> None: |
| update = UpdateSchema(transaction=table_v2.transaction()) |
| update.add_column(path="b", field_type=IntegerType()) |
| apply_schema: Schema = update._apply() # pylint: disable=W0212 |
| assert len(apply_schema.fields) == 4 |
| |
| assert apply_schema == Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| NestedField(field_id=3, name="z", field_type=LongType(), required=True), |
| NestedField(field_id=4, name="b", field_type=IntegerType(), required=False), |
| identifier_field_ids=[1, 2], |
| ) |
| assert apply_schema.schema_id == 2 |
| assert apply_schema.highest_field_id == 4 |
| |
| |
| def test_update_column(table_v1: Table, table_v2: Table) -> None: |
| """ |
| Table should be able to update existing property `doc` |
| Table should also be able to update property `required`, if the field is not an identifier field. |
| """ |
| COMMENT2 = "comment2" |
| for table in [table_v1, table_v2]: |
| original_schema = table.schema() |
| # update existing doc to a new doc |
| assert original_schema.find_field("y").doc == "comment" |
| new_schema = table.transaction().update_schema().update_column("y", doc=COMMENT2)._apply() |
| assert new_schema.find_field("y").doc == COMMENT2, "failed to update existing field doc" |
| |
| # update existing doc to an empty string |
| assert new_schema.find_field("y").doc == COMMENT2 |
| new_schema2 = table.transaction().update_schema().update_column("y", doc="")._apply() |
| assert new_schema2.find_field("y").doc == "", "failed to remove existing field doc" |
| |
| # update required to False |
| assert original_schema.find_field("z").required is True |
| new_schema3 = table.transaction().update_schema().update_column("z", required=False)._apply() |
| assert new_schema3.find_field("z").required is False, "failed to update existing field required" |
| |
| # assert the above two updates also works with union_by_name |
| assert ( |
| table.update_schema().union_by_name(new_schema)._apply() == new_schema |
| ), "failed to update existing field doc with union_by_name" |
| assert ( |
| table.update_schema().union_by_name(new_schema2)._apply() == new_schema2 |
| ), "failed to remove existing field doc with union_by_name" |
| assert ( |
| table.update_schema().union_by_name(new_schema3)._apply() == new_schema3 |
| ), "failed to update existing field required with union_by_name" |
| |
| |
| def test_add_primitive_type_column(table_v2: Table) -> None: |
| primitive_type: Dict[str, PrimitiveType] = { |
| "boolean": BooleanType(), |
| "int": IntegerType(), |
| "long": LongType(), |
| "float": FloatType(), |
| "double": DoubleType(), |
| "date": DateType(), |
| "time": TimeType(), |
| "timestamp": TimestampType(), |
| "timestamptz": TimestamptzType(), |
| "string": StringType(), |
| "uuid": UUIDType(), |
| "binary": BinaryType(), |
| } |
| |
| for name, type_ in primitive_type.items(): |
| field_name = f"new_column_{name}" |
| update = UpdateSchema(transaction=table_v2.transaction()) |
| update.add_column(path=field_name, field_type=type_, doc=f"new_column_{name}") |
| new_schema = update._apply() # pylint: disable=W0212 |
| |
| field: NestedField = new_schema.find_field(field_name) |
| assert field.field_type == type_ |
| assert field.doc == f"new_column_{name}" |
| |
| |
| def test_add_nested_type_column(table_v2: Table) -> None: |
| # add struct type column |
| field_name = "new_column_struct" |
| update = UpdateSchema(transaction=table_v2.transaction()) |
| struct_ = StructType( |
| NestedField(1, "lat", DoubleType()), |
| NestedField(2, "long", DoubleType()), |
| ) |
| update.add_column(path=field_name, field_type=struct_) |
| schema_ = update._apply() # pylint: disable=W0212 |
| field: NestedField = schema_.find_field(field_name) |
| assert field.field_type == StructType( |
| NestedField(5, "lat", DoubleType()), |
| NestedField(6, "long", DoubleType()), |
| ) |
| assert schema_.highest_field_id == 6 |
| |
| |
| def test_add_nested_map_type_column(table_v2: Table) -> None: |
| # add map type column |
| field_name = "new_column_map" |
| update = UpdateSchema(transaction=table_v2.transaction()) |
| map_ = MapType(1, StringType(), 2, IntegerType(), False) |
| update.add_column(path=field_name, field_type=map_) |
| new_schema = update._apply() # pylint: disable=W0212 |
| field: NestedField = new_schema.find_field(field_name) |
| assert field.field_type == MapType(5, StringType(), 6, IntegerType(), False) |
| assert new_schema.highest_field_id == 6 |
| |
| |
| def test_add_nested_list_type_column(table_v2: Table) -> None: |
| # add list type column |
| field_name = "new_column_list" |
| update = UpdateSchema(transaction=table_v2.transaction()) |
| list_ = ListType( |
| element_id=101, |
| element_type=StructType( |
| NestedField(102, "lat", DoubleType()), |
| NestedField(103, "long", DoubleType()), |
| ), |
| element_required=False, |
| ) |
| update.add_column(path=field_name, field_type=list_) |
| new_schema = update._apply() # pylint: disable=W0212 |
| field: NestedField = new_schema.find_field(field_name) |
| assert field.field_type == ListType( |
| element_id=5, |
| element_type=StructType( |
| NestedField(6, "lat", DoubleType()), |
| NestedField(7, "long", DoubleType()), |
| ), |
| element_required=False, |
| ) |
| assert new_schema.highest_field_id == 7 |
| |
| |
| def test_apply_set_properties_update(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| |
| new_metadata_no_update = update_table_metadata(base_metadata, (SetPropertiesUpdate(updates={}),)) |
| assert new_metadata_no_update == base_metadata |
| |
| new_metadata = update_table_metadata( |
| base_metadata, (SetPropertiesUpdate(updates={"read.split.target.size": "123", "test_a": "test_a", "test_b": "test_b"}),) |
| ) |
| |
| assert base_metadata.properties == {"read.split.target.size": "134217728"} |
| assert new_metadata.properties == {"read.split.target.size": "123", "test_a": "test_a", "test_b": "test_b"} |
| |
| new_metadata_add_only = update_table_metadata(new_metadata, (SetPropertiesUpdate(updates={"test_c": "test_c"}),)) |
| |
| assert new_metadata_add_only.properties == { |
| "read.split.target.size": "123", |
| "test_a": "test_a", |
| "test_b": "test_b", |
| "test_c": "test_c", |
| } |
| assert new_metadata_add_only.last_updated_ms > base_metadata.last_updated_ms |
| |
| |
| def test_apply_remove_properties_update(table_v2: Table) -> None: |
| base_metadata = update_table_metadata( |
| table_v2.metadata, |
| (SetPropertiesUpdate(updates={"test_a": "test_a", "test_b": "test_b", "test_c": "test_c", "test_d": "test_d"}),), |
| ) |
| |
| new_metadata_no_removal = update_table_metadata(base_metadata, (RemovePropertiesUpdate(removals=[]),)) |
| |
| assert base_metadata == new_metadata_no_removal |
| |
| new_metadata = update_table_metadata(base_metadata, (RemovePropertiesUpdate(removals=["test_a", "test_c"]),)) |
| |
| assert base_metadata.properties == { |
| "read.split.target.size": "134217728", |
| "test_a": "test_a", |
| "test_b": "test_b", |
| "test_c": "test_c", |
| "test_d": "test_d", |
| } |
| assert new_metadata.properties == {"read.split.target.size": "134217728", "test_b": "test_b", "test_d": "test_d"} |
| |
| |
| def test_apply_add_schema_update(table_v2: Table) -> None: |
| transaction = table_v2.transaction() |
| update = transaction.update_schema() |
| update.add_column(path="b", field_type=IntegerType()) |
| update.commit() |
| |
| test_context = _TableMetadataUpdateContext() |
| |
| new_table_metadata = _apply_table_update(transaction._updates[0], base_metadata=table_v2.metadata, context=test_context) # pylint: disable=W0212 |
| assert len(new_table_metadata.schemas) == 3 |
| assert new_table_metadata.current_schema_id == 1 |
| assert len(test_context._updates) == 1 |
| assert test_context._updates[0] == transaction._updates[0] # pylint: disable=W0212 |
| assert test_context.is_added_schema(2) |
| |
| new_table_metadata = _apply_table_update(transaction._updates[1], base_metadata=new_table_metadata, context=test_context) # pylint: disable=W0212 |
| assert len(new_table_metadata.schemas) == 3 |
| assert new_table_metadata.current_schema_id == 2 |
| assert len(test_context._updates) == 2 |
| assert test_context._updates[1] == transaction._updates[1] # pylint: disable=W0212 |
| assert test_context.is_added_schema(2) |
| |
| |
| def test_update_metadata_table_schema(table_v2: Table) -> None: |
| transaction = table_v2.transaction() |
| update = transaction.update_schema() |
| update.add_column(path="b", field_type=IntegerType()) |
| update.commit() |
| new_metadata = update_table_metadata(table_v2.metadata, transaction._updates) # pylint: disable=W0212 |
| apply_schema: Schema = next(schema for schema in new_metadata.schemas if schema.schema_id == 2) |
| assert len(apply_schema.fields) == 4 |
| |
| assert apply_schema == Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), |
| NestedField(field_id=3, name="z", field_type=LongType(), required=True), |
| NestedField(field_id=4, name="b", field_type=IntegerType(), required=False), |
| identifier_field_ids=[1, 2], |
| ) |
| assert apply_schema.schema_id == 2 |
| assert apply_schema.highest_field_id == 4 |
| |
| assert new_metadata.current_schema_id == 2 |
| |
| |
| def test_update_metadata_add_snapshot(table_v2: Table) -> None: |
| new_snapshot = Snapshot( |
| snapshot_id=25, |
| parent_snapshot_id=19, |
| sequence_number=200, |
| timestamp_ms=1602638593590, |
| manifest_list="s3:/a/b/c.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=3, |
| ) |
| |
| new_metadata = update_table_metadata(table_v2.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) |
| assert len(new_metadata.snapshots) == 3 |
| assert new_metadata.snapshots[-1] == new_snapshot |
| assert new_metadata.last_sequence_number == new_snapshot.sequence_number |
| assert new_metadata.last_updated_ms == new_snapshot.timestamp_ms |
| |
| |
| def test_update_metadata_set_ref_snapshot(table_v2: Table) -> None: |
| update, _ = table_v2.transaction()._set_ref_snapshot( |
| snapshot_id=3051729675574597004, |
| ref_name="main", |
| type="branch", |
| max_ref_age_ms=123123123, |
| max_snapshot_age_ms=12312312312, |
| min_snapshots_to_keep=1, |
| ) |
| |
| new_metadata = update_table_metadata(table_v2.metadata, update) |
| assert len(new_metadata.snapshot_log) == 3 |
| assert new_metadata.snapshot_log[2].snapshot_id == 3051729675574597004 |
| assert new_metadata.current_snapshot_id == 3051729675574597004 |
| assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms |
| assert new_metadata.refs["main"] == SnapshotRef( |
| snapshot_id=3051729675574597004, |
| snapshot_ref_type="branch", |
| min_snapshots_to_keep=1, |
| max_snapshot_age_ms=12312312312, |
| max_ref_age_ms=123123123, |
| ) |
| |
| |
| def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None: |
| update = SetSnapshotRefUpdate( |
| ref_name="main", |
| type="branch", |
| snapshot_id=3051729675574597004, |
| max_ref_age_ms=123123123, |
| max_snapshot_age_ms=12312312312, |
| min_snapshots_to_keep=1, |
| ) |
| |
| new_metadata = update_table_metadata(table_v2.metadata, (update,)) |
| assert len(new_metadata.snapshot_log) == 3 |
| assert new_metadata.snapshot_log[2].snapshot_id == 3051729675574597004 |
| assert new_metadata.current_snapshot_id == 3051729675574597004 |
| assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms |
| assert new_metadata.refs[update.ref_name] == SnapshotRef( |
| snapshot_id=3051729675574597004, |
| snapshot_ref_type="branch", |
| min_snapshots_to_keep=1, |
| max_snapshot_age_ms=12312312312, |
| max_ref_age_ms=123123123, |
| ) |
| |
| |
| def test_update_remove_snapshots(table_v2: Table) -> None: |
| REMOVE_SNAPSHOT = 3051729675574597004 |
| KEEP_SNAPSHOT = 3055729675574597004 |
| # assert fixture data to easily understand the test assumptions |
| assert len(table_v2.metadata.snapshots) == 2 |
| assert len(table_v2.metadata.snapshot_log) == 2 |
| assert len(table_v2.metadata.refs) == 2 |
| update = RemoveSnapshotsUpdate(snapshot_ids=[REMOVE_SNAPSHOT]) |
| new_metadata = update_table_metadata(table_v2.metadata, (update,)) |
| assert len(new_metadata.snapshots) == 1 |
| assert new_metadata.snapshots[0].snapshot_id == KEEP_SNAPSHOT |
| assert new_metadata.snapshots[0].parent_snapshot_id is None |
| assert new_metadata.current_snapshot_id == KEEP_SNAPSHOT |
| assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms |
| assert len(new_metadata.snapshot_log) == 1 |
| assert new_metadata.snapshot_log[0].snapshot_id == KEEP_SNAPSHOT |
| assert len(new_metadata.refs) == 1 |
| assert new_metadata.refs["main"].snapshot_id == KEEP_SNAPSHOT |
| |
| |
| def test_update_remove_snapshots_doesnt_exist(table_v2: Table) -> None: |
| update = RemoveSnapshotsUpdate( |
| snapshot_ids=[123], |
| ) |
| with pytest.raises(ValueError, match="Snapshot with snapshot id 123 does not exist"): |
| update_table_metadata(table_v2.metadata, (update,)) |
| |
| |
| def test_update_remove_snapshots_remove_current_snapshot_id(table_v2: Table) -> None: |
| update = RemoveSnapshotsUpdate(snapshot_ids=[3055729675574597004]) |
| new_metadata = update_table_metadata(table_v2.metadata, (update,)) |
| assert len(new_metadata.refs) == 1 |
| assert new_metadata.refs["test"].snapshot_id == 3051729675574597004 |
| assert new_metadata.current_snapshot_id is None |
| |
| |
| def test_update_remove_snapshot_ref(table_v2: Table) -> None: |
| # assert fixture data to easily understand the test assumptions |
| assert len(table_v2.metadata.refs) == 2 |
| update = RemoveSnapshotRefUpdate(ref_name="test") |
| new_metadata = update_table_metadata(table_v2.metadata, (update,)) |
| assert len(new_metadata.refs) == 1 |
| assert new_metadata.refs["main"].snapshot_id == 3055729675574597004 |
| |
| |
| def test_update_metadata_add_update_sort_order(table_v2: Table) -> None: |
| new_sort_order = SortOrder(order_id=table_v2.sort_order().order_id + 1) |
| new_metadata = update_table_metadata( |
| table_v2.metadata, |
| (AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1)), |
| ) |
| assert len(new_metadata.sort_orders) == 2 |
| assert new_metadata.sort_orders[-1] == new_sort_order |
| assert new_metadata.default_sort_order_id == new_sort_order.order_id |
| assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms |
| |
| |
| def test_update_metadata_update_sort_order_invalid(table_v2: Table) -> None: |
| with pytest.raises(ValueError, match="Cannot set current sort order to the last added one when no sort order has been added"): |
| update_table_metadata(table_v2.metadata, (SetDefaultSortOrderUpdate(sort_order_id=-1),)) |
| |
| invalid_order_id = 10 |
| with pytest.raises(ValueError, match=f"Sort order with id {invalid_order_id} does not exist"): |
| update_table_metadata(table_v2.metadata, (SetDefaultSortOrderUpdate(sort_order_id=invalid_order_id),)) |
| |
| |
| def test_update_metadata_with_multiple_updates(table_v1: Table) -> None: |
| base_metadata = table_v1.metadata |
| transaction = table_v1.transaction() |
| transaction.upgrade_table_version(format_version=2) |
| |
| schema_update_1 = transaction.update_schema() |
| schema_update_1.add_column(path="b", field_type=IntegerType()) |
| schema_update_1.commit() |
| |
| transaction.set_properties(owner="test", test_a="test_a", test_b="test_b", test_c="test_c") |
| |
| test_updates = transaction._updates # pylint: disable=W0212 |
| |
| new_snapshot = Snapshot( |
| snapshot_id=25, |
| parent_snapshot_id=19, |
| sequence_number=200, |
| timestamp_ms=1602638573590, |
| manifest_list="s3:/a/b/c.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=3, |
| ) |
| |
| test_updates += ( |
| AddSnapshotUpdate(snapshot=new_snapshot), |
| SetPropertiesUpdate(updates={"test_a": "test_a1"}), |
| SetSnapshotRefUpdate( |
| ref_name="main", |
| type="branch", |
| snapshot_id=25, |
| max_ref_age_ms=123123123, |
| max_snapshot_age_ms=12312312312, |
| min_snapshots_to_keep=1, |
| ), |
| RemovePropertiesUpdate(removals=["test_c", "test_b"]), |
| ) |
| |
| new_metadata = update_table_metadata(base_metadata, test_updates) |
| # rebuild the metadata to trigger validation |
| new_metadata = TableMetadataUtil.parse_obj(copy(new_metadata.model_dump())) |
| |
| # UpgradeFormatVersionUpdate |
| assert new_metadata.format_version == 2 |
| assert isinstance(new_metadata, TableMetadataV2) |
| |
| # UpdateSchema |
| assert len(new_metadata.schemas) == 2 |
| assert new_metadata.current_schema_id == 1 |
| assert new_metadata.schema_by_id(new_metadata.current_schema_id).highest_field_id == 4 # type: ignore |
| |
| # AddSchemaUpdate |
| assert len(new_metadata.snapshots) == 2 |
| assert new_metadata.snapshots[-1] == new_snapshot |
| assert new_metadata.last_sequence_number == new_snapshot.sequence_number |
| assert new_metadata.last_updated_ms == new_snapshot.timestamp_ms |
| |
| # SetSnapshotRefUpdate |
| assert len(new_metadata.snapshot_log) == 1 |
| assert new_metadata.snapshot_log[0].snapshot_id == 25 |
| assert new_metadata.current_snapshot_id == 25 |
| assert new_metadata.last_updated_ms == 1602638573590 |
| assert new_metadata.refs["main"] == SnapshotRef( |
| snapshot_id=25, |
| snapshot_ref_type="branch", |
| min_snapshots_to_keep=1, |
| max_snapshot_age_ms=12312312312, |
| max_ref_age_ms=123123123, |
| ) |
| |
| # Set/RemovePropertiesUpdate |
| assert new_metadata.properties == {"owner": "test", "test_a": "test_a1"} |
| |
| |
| def test_update_metadata_schema_immutability( |
| table_v2_with_fixed_and_decimal_types: TableMetadataV2, |
| ) -> None: |
| update = SetSnapshotRefUpdate( |
| ref_name="main", |
| type="branch", |
| snapshot_id=3051729675574597004, |
| max_ref_age_ms=123123123, |
| max_snapshot_age_ms=12312312312, |
| min_snapshots_to_keep=1, |
| ) |
| |
| new_metadata = update_table_metadata( |
| table_v2_with_fixed_and_decimal_types.metadata, |
| (update,), |
| ) |
| |
| assert new_metadata.schemas[0].fields == ( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=4, name="a", field_type=DecimalType(precision=16, scale=2), required=True), |
| NestedField(field_id=5, name="b", field_type=DecimalType(precision=16, scale=8), required=True), |
| NestedField(field_id=6, name="c", field_type=FixedType(length=16), required=True), |
| NestedField(field_id=7, name="d", field_type=FixedType(length=18), required=True), |
| ) |
| |
| |
| def test_metadata_isolation_from_illegal_updates(table_v1: Table) -> None: |
| base_metadata = table_v1.metadata |
| base_metadata_backup = base_metadata.model_copy(deep=True) |
| |
| # Apply legal updates on the table metadata |
| transaction = table_v1.transaction() |
| schema_update_1 = transaction.update_schema() |
| schema_update_1.add_column(path="b", field_type=IntegerType()) |
| schema_update_1.commit() |
| test_updates = transaction._updates # pylint: disable=W0212 |
| new_snapshot = Snapshot( |
| snapshot_id=25, |
| parent_snapshot_id=19, |
| sequence_number=200, |
| timestamp_ms=1602638573590, |
| manifest_list="s3:/a/b/c.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=3, |
| ) |
| test_updates += ( |
| AddSnapshotUpdate(snapshot=new_snapshot), |
| SetSnapshotRefUpdate( |
| ref_name="main", |
| type="branch", |
| snapshot_id=25, |
| max_ref_age_ms=123123123, |
| max_snapshot_age_ms=12312312312, |
| min_snapshots_to_keep=1, |
| ), |
| ) |
| new_metadata = update_table_metadata(base_metadata, test_updates) |
| |
| # Check that the original metadata is not modified |
| assert base_metadata == base_metadata_backup |
| |
| # Perform illegal update on the new metadata: |
| # TableMetadata should be immutable, but the pydantic's frozen config cannot prevent |
| # operations such as list append. |
| new_metadata.partition_specs.append(PartitionSpec(spec_id=0)) |
| assert len(new_metadata.partition_specs) == 2 |
| |
| # The original metadata should not be affected by the illegal update on the new metadata |
| assert len(base_metadata.partition_specs) == 1 |
| |
| |
| def test_generate_snapshot_id(table_v2: Table) -> None: |
| assert isinstance(_generate_snapshot_id(), int) |
| assert isinstance(table_v2.metadata.new_snapshot_id(), int) |
| |
| |
| def test_assert_create(table_v2: Table) -> None: |
| AssertCreate().validate(None) |
| |
| with pytest.raises(CommitFailedException, match="Table already exists"): |
| AssertCreate().validate(table_v2.metadata) |
| |
| |
| def test_assert_table_uuid(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertTableUUID(uuid=base_metadata.table_uuid).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertTableUUID(uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c2")).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Table UUID does not match: 9c12d441-03fe-4693-9a96-a0705ddf69c2 != 9c12d441-03fe-4693-9a96-a0705ddf69c1", |
| ): |
| AssertTableUUID(uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c2")).validate(base_metadata) |
| |
| |
| def test_assert_ref_snapshot_id(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: branch main was created concurrently", |
| ): |
| AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004", |
| ): |
| AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: branch or tag not_exist is missing, expected 1", |
| ): |
| AssertRefSnapshotId(ref="not_exist", snapshot_id=1).validate(base_metadata) |
| |
| |
| def test_assert_last_assigned_field_id(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertLastAssignedFieldId(last_assigned_field_id=base_metadata.last_column_id).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertLastAssignedFieldId(last_assigned_field_id=1).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: last assigned field id has changed: expected 1, found 3", |
| ): |
| AssertLastAssignedFieldId(last_assigned_field_id=1).validate(base_metadata) |
| |
| |
| def test_assert_current_schema_id(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertCurrentSchemaId(current_schema_id=base_metadata.current_schema_id).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertCurrentSchemaId(current_schema_id=1).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: current schema id has changed: expected 2, found 1", |
| ): |
| AssertCurrentSchemaId(current_schema_id=2).validate(base_metadata) |
| |
| |
| def test_last_assigned_partition_id(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertLastAssignedPartitionId(last_assigned_partition_id=base_metadata.last_partition_id).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertLastAssignedPartitionId(last_assigned_partition_id=1).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: last assigned partition id has changed: expected 1, found 1000", |
| ): |
| AssertLastAssignedPartitionId(last_assigned_partition_id=1).validate(base_metadata) |
| |
| |
| def test_assert_default_spec_id(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertDefaultSpecId(default_spec_id=base_metadata.default_spec_id).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertDefaultSpecId(default_spec_id=1).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: default spec id has changed: expected 1, found 0", |
| ): |
| AssertDefaultSpecId(default_spec_id=1).validate(base_metadata) |
| |
| |
| def test_assert_default_sort_order_id(table_v2: Table) -> None: |
| base_metadata = table_v2.metadata |
| AssertDefaultSortOrderId(default_sort_order_id=base_metadata.default_sort_order_id).validate(base_metadata) |
| |
| with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"): |
| AssertDefaultSortOrderId(default_sort_order_id=1).validate(None) |
| |
| with pytest.raises( |
| CommitFailedException, |
| match="Requirement failed: default sort order id has changed: expected 1, found 3", |
| ): |
| AssertDefaultSortOrderId(default_sort_order_id=1).validate(base_metadata) |
| |
| |
| def test_correct_schema() -> None: |
| table_metadata = TableMetadataV2( |
| **{ |
| "format-version": 2, |
| "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", |
| "location": "s3://bucket/test/location", |
| "last-sequence-number": 34, |
| "last-updated-ms": 1602638573590, |
| "last-column-id": 3, |
| "current-schema-id": 1, |
| "schemas": [ |
| {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}, |
| { |
| "type": "struct", |
| "schema-id": 1, |
| "identifier-field-ids": [1, 2], |
| "fields": [ |
| {"id": 1, "name": "x", "required": True, "type": "long"}, |
| {"id": 2, "name": "y", "required": True, "type": "long"}, |
| {"id": 3, "name": "z", "required": True, "type": "long"}, |
| ], |
| }, |
| ], |
| "default-spec-id": 0, |
| "partition-specs": [ |
| {"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]} |
| ], |
| "last-partition-id": 1000, |
| "default-sort-order-id": 0, |
| "sort-orders": [], |
| "current-snapshot-id": 123, |
| "snapshots": [ |
| { |
| "snapshot-id": 234, |
| "timestamp-ms": 1515100955770, |
| "sequence-number": 0, |
| "summary": {"operation": "append"}, |
| "manifest-list": "s3://a/b/1.avro", |
| "schema-id": 10, |
| }, |
| { |
| "snapshot-id": 123, |
| "timestamp-ms": 1515100955770, |
| "sequence-number": 0, |
| "summary": {"operation": "append"}, |
| "manifest-list": "s3://a/b/1.avro", |
| "schema-id": 0, |
| }, |
| ], |
| } |
| ) |
| |
| t = Table( |
| identifier=("default", "t1"), |
| metadata=table_metadata, |
| metadata_location="s3://../..", |
| io=load_file_io(), |
| catalog=NoopCatalog("NoopCatalog"), |
| ) |
| |
| # Should use the current schema, instead the one from the snapshot |
| projection_schema = t.scan().projection() |
| assert projection_schema == Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| NestedField(field_id=2, name="y", field_type=LongType(), required=True), |
| NestedField(field_id=3, name="z", field_type=LongType(), required=True), |
| identifier_field_ids=[1, 2], |
| ) |
| assert projection_schema.schema_id == 1 |
| |
| # When we explicitly filter on the commit, we want to have the schema that's linked to the snapshot |
| projection_schema = t.scan(snapshot_id=123).projection() |
| assert projection_schema == Schema( |
| NestedField(field_id=1, name="x", field_type=LongType(), required=True), |
| identifier_field_ids=[], |
| ) |
| assert projection_schema.schema_id == 0 |
| |
| with pytest.warns(UserWarning, match="Metadata does not contain schema with id: 10"): |
| t.scan(snapshot_id=234).projection() |
| |
| # Invalid snapshot |
| with pytest.raises(ValueError) as exc_info: |
| _ = t.scan(snapshot_id=-1).projection() |
| |
| assert "Snapshot not found: -1" in str(exc_info.value) |
| |
| |
| def test_table_properties(example_table_metadata_v2: Dict[str, Any]) -> None: |
| # metadata properties are all strings |
| for k, v in example_table_metadata_v2["properties"].items(): |
| assert isinstance(k, str) |
| assert isinstance(v, str) |
| metadata = TableMetadataV2(**example_table_metadata_v2) |
| for k, v in metadata.properties.items(): |
| assert isinstance(k, str) |
| assert isinstance(v, str) |
| |
| # property can be set to int, but still serialized as string |
| property_with_int = {"property_name": 42} |
| new_example_table_metadata_v2 = {**example_table_metadata_v2, "properties": property_with_int} |
| assert isinstance(new_example_table_metadata_v2["properties"]["property_name"], int) |
| new_metadata = TableMetadataV2(**new_example_table_metadata_v2) |
| assert isinstance(new_metadata.properties["property_name"], str) |
| |
| |
| def test_table_properties_raise_for_none_value(example_table_metadata_v2: Dict[str, Any]) -> None: |
| property_with_none = {"property_name": None} |
| example_table_metadata_v2 = {**example_table_metadata_v2, "properties": property_with_none} |
| with pytest.raises(ValidationError) as exc_info: |
| TableMetadataV2(**example_table_metadata_v2) |
| assert "None type is not a supported value in properties: property_name" in str(exc_info.value) |
| |
| |
| def test_serialize_commit_table_request() -> None: |
| request = CommitTableRequest( |
| requirements=(AssertTableUUID(uuid="4bfd18a3-74c6-478e-98b1-71c4c32f4163"),), |
| identifier=TableIdentifier(namespace=["a"], name="b"), |
| ) |
| |
| deserialized_request = CommitTableRequest.model_validate_json(request.model_dump_json()) |
| assert request == deserialized_request |
| |
| |
| def test_update_metadata_log(table_v2: Table) -> None: |
| new_snapshot = Snapshot( |
| snapshot_id=25, |
| parent_snapshot_id=19, |
| sequence_number=200, |
| timestamp_ms=1602638593590, |
| manifest_list="s3:/a/b/c.avro", |
| summary=Summary(Operation.APPEND), |
| schema_id=3, |
| ) |
| |
| new_metadata = update_table_metadata( |
| table_v2.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),), False, table_v2.metadata_location |
| ) |
| assert len(new_metadata.metadata_log) == 2 |
| |
| |
| def test_update_metadata_log_overflow(table_v2: Table) -> None: |
| metadata_log = [ |
| MetadataLogEntry( |
| timestamp_ms=1602638593590 + i, |
| metadata_file=f"/path/to/metadata/{i}.json", |
| ) |
| for i in range(10) |
| ] |
| table_v2.metadata = table_v2.metadata.model_copy(update={"metadata_log": metadata_log, "last_updated_ms": 1602638593600}) |
| table_v2.metadata_location = "/path/to/metadata/10.json" |
| assert len(table_v2.metadata.metadata_log) == 10 |
| |
| base_metadata = table_v2.metadata |
| new_metadata = update_table_metadata( |
| base_metadata, |
| (SetPropertiesUpdate(updates={"write.metadata.previous-versions-max": "5"}),), |
| False, |
| table_v2.metadata_location, |
| ) |
| assert len(new_metadata.metadata_log) == 5 |
| assert new_metadata.metadata_log[-1].metadata_file == "/path/to/metadata/10.json" |
| |
| # check invalid value of write.metadata.previous-versions-max |
| new_metadata = update_table_metadata( |
| base_metadata, |
| (SetPropertiesUpdate(updates={"write.metadata.previous-versions-max": "0"}),), |
| False, |
| table_v2.metadata_location, |
| ) |
| assert len(new_metadata.metadata_log) == 1 |
| |
| |
| def test_set_statistics_update(table_v2_with_statistics: Table) -> None: |
| snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id |
| |
| blob_metadata = BlobMetadata( |
| type="apache-datasketches-theta-v1", |
| snapshot_id=snapshot_id, |
| sequence_number=2, |
| fields=[1], |
| properties={"prop-key": "prop-value"}, |
| ) |
| |
| statistics_file = StatisticsFile( |
| snapshot_id=snapshot_id, |
| statistics_path="s3://bucket/warehouse/stats.puffin", |
| file_size_in_bytes=124, |
| file_footer_size_in_bytes=27, |
| blob_metadata=[blob_metadata], |
| ) |
| |
| update = SetStatisticsUpdate( |
| snapshot_id=snapshot_id, |
| statistics=statistics_file, |
| ) |
| |
| new_metadata = update_table_metadata( |
| table_v2_with_statistics.metadata, |
| (update,), |
| ) |
| |
| expected = """ |
| { |
| "snapshot-id": 3055729675574597004, |
| "statistics-path": "s3://bucket/warehouse/stats.puffin", |
| "file-size-in-bytes": 124, |
| "file-footer-size-in-bytes": 27, |
| "blob-metadata": [ |
| { |
| "type": "apache-datasketches-theta-v1", |
| "snapshot-id": 3055729675574597004, |
| "sequence-number": 2, |
| "fields": [ |
| 1 |
| ], |
| "properties": { |
| "prop-key": "prop-value" |
| } |
| } |
| ] |
| }""" |
| |
| assert len(new_metadata.statistics) == 2 |
| |
| updated_statistics = [stat for stat in new_metadata.statistics if stat.snapshot_id == snapshot_id] |
| |
| assert len(updated_statistics) == 1 |
| assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected) |
| |
| |
| def test_remove_statistics_update(table_v2_with_statistics: Table) -> None: |
| update = RemoveStatisticsUpdate( |
| snapshot_id=3055729675574597004, |
| ) |
| |
| remove_metadata = update_table_metadata( |
| table_v2_with_statistics.metadata, |
| (update,), |
| ) |
| |
| assert len(remove_metadata.statistics) == 1 |
| |
| with pytest.raises( |
| ValueError, |
| match="Statistics with snapshot id 123456789 does not exist", |
| ): |
| update_table_metadata( |
| table_v2_with_statistics.metadata, |
| (RemoveStatisticsUpdate(snapshot_id=123456789),), |
| ) |