| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint: disable=W0212 |
| from typing import Any, Dict |
| |
| import pytest |
| |
| from pyiceberg.schema import Schema |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DecimalType, |
| FixedType, |
| IntegerType, |
| ListType, |
| LongType, |
| MapType, |
| NestedField, |
| StringType, |
| StructType, |
| ) |
| from pyiceberg.utils.schema_conversion import AvroSchemaConversion |
| |
| |
| def test_avro_to_iceberg(avro_schema_manifest_file_v1: Dict[str, Any]) -> None: |
| iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file_v1) |
| expected_iceberg_schema = Schema( |
| NestedField( |
| field_id=500, name="manifest_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" |
| ), |
| NestedField(field_id=501, name="manifest_length", field_type=LongType(), required=True, doc="Total file size in bytes"), |
| NestedField(field_id=502, name="partition_spec_id", field_type=IntegerType(), required=True, doc="Spec ID used to write"), |
| NestedField( |
| field_id=503, |
| name="added_snapshot_id", |
| field_type=LongType(), |
| required=False, |
| doc="Snapshot ID that added the manifest", |
| ), |
| NestedField( |
| field_id=504, name="added_data_files_count", field_type=IntegerType(), required=False, doc="Added entry count" |
| ), |
| NestedField( |
| field_id=505, name="existing_data_files_count", field_type=IntegerType(), required=False, doc="Existing entry count" |
| ), |
| NestedField( |
| field_id=506, name="deleted_data_files_count", field_type=IntegerType(), required=False, doc="Deleted entry count" |
| ), |
| NestedField( |
| field_id=507, |
| name="partitions", |
| field_type=ListType( |
| element_id=508, |
| element_type=StructType( |
| NestedField( |
| field_id=509, |
| name="contains_null", |
| field_type=BooleanType(), |
| required=True, |
| doc="True if any file has a null partition value", |
| ), |
| NestedField( |
| field_id=518, |
| name="contains_nan", |
| field_type=BooleanType(), |
| required=False, |
| doc="True if any file has a nan partition value", |
| ), |
| NestedField( |
| field_id=510, |
| name="lower_bound", |
| field_type=BinaryType(), |
| required=False, |
| doc="Partition lower bound for all files", |
| ), |
| NestedField( |
| field_id=511, |
| name="upper_bound", |
| field_type=BinaryType(), |
| required=False, |
| doc="Partition upper bound for all files", |
| ), |
| ), |
| element_required=True, |
| ), |
| required=False, |
| doc="Summary for each partition", |
| ), |
| NestedField(field_id=512, name="added_rows_count", field_type=LongType(), required=False, doc="Added rows count"), |
| NestedField(field_id=513, name="existing_rows_count", field_type=LongType(), required=False, doc="Existing rows count"), |
| NestedField(field_id=514, name="deleted_rows_count", field_type=LongType(), required=False, doc="Deleted rows count"), |
| schema_id=1, |
| identifier_field_ids=[], |
| ) |
| assert iceberg_schema == expected_iceberg_schema |
| |
| |
| def test_avro_list_required_primitive() -> None: |
| avro_schema = { |
| "type": "record", |
| "name": "avro_schema", |
| "fields": [ |
| { |
| "name": "array_with_string", |
| "type": { |
| "type": "array", |
| "items": "string", |
| "default": [], |
| "element-id": 101, |
| }, |
| "field-id": 100, |
| }, |
| ], |
| } |
| |
| expected_iceberg_schema = Schema( |
| NestedField( |
| field_id=100, |
| name="array_with_string", |
| field_type=ListType(element_id=101, element_type=StringType(), element_required=True), |
| required=True, |
| ), |
| schema_id=1, |
| ) |
| |
| iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema) |
| |
| assert expected_iceberg_schema == iceberg_schema |
| |
| |
| def test_avro_list_wrapped_primitive() -> None: |
| avro_schema = { |
| "type": "record", |
| "name": "avro_schema", |
| "fields": [ |
| { |
| "name": "array_with_string", |
| "type": { |
| "type": "array", |
| "items": {"type": "string"}, |
| "default": [], |
| "element-id": 101, |
| }, |
| "field-id": 100, |
| }, |
| ], |
| } |
| |
| expected_iceberg_schema = Schema( |
| NestedField( |
| field_id=100, |
| name="array_with_string", |
| field_type=ListType(element_id=101, element_type=StringType(), element_required=True), |
| required=True, |
| ), |
| schema_id=1, |
| ) |
| |
| iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema) |
| |
| assert expected_iceberg_schema == iceberg_schema |
| |
| |
| def test_avro_list_required_record() -> None: |
| avro_schema = { |
| "type": "record", |
| "name": "avro_schema", |
| "fields": [ |
| { |
| "name": "array_with_record", |
| "type": { |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "r101", |
| "fields": [ |
| { |
| "name": "contains_null", |
| "type": "boolean", |
| "field-id": 102, |
| }, |
| { |
| "name": "contains_nan", |
| "type": ["null", "boolean"], |
| "field-id": 103, |
| }, |
| ], |
| }, |
| "element-id": 101, |
| }, |
| "field-id": 100, |
| } |
| ], |
| } |
| |
| expected_iceberg_schema = Schema( |
| NestedField( |
| field_id=100, |
| name="array_with_record", |
| field_type=ListType( |
| element_id=101, |
| element_type=StructType( |
| NestedField(field_id=102, name="contains_null", field_type=BooleanType(), required=True), |
| NestedField(field_id=103, name="contains_nan", field_type=BooleanType(), required=False), |
| ), |
| element_required=True, |
| ), |
| required=True, |
| ), |
| schema_id=1, |
| identifier_field_ids=[], |
| ) |
| |
| iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema) |
| |
| assert expected_iceberg_schema == iceberg_schema |
| |
| |
| def test_resolve_union() -> None: |
| with pytest.raises(TypeError) as exc_info: |
| AvroSchemaConversion()._resolve_union(["null", "string", "long"]) |
| |
| assert "Non-optional types aren't part of the Iceberg specification" in str(exc_info.value) |
| |
| |
| def test_nested_type() -> None: |
| # In the case a primitive field is nested |
| assert AvroSchemaConversion()._convert_schema({"type": {"type": "string"}}) == StringType() |
| |
| |
| def test_map_type() -> None: |
| avro_type = { |
| "type": "map", |
| "values": ["null", "long"], |
| "key-id": 101, |
| "value-id": 102, |
| } |
| actual = AvroSchemaConversion()._convert_schema(avro_type) |
| expected = MapType(key_id=101, key_type=StringType(), value_id=102, value_type=LongType(), value_required=False) |
| assert actual == expected |
| |
| |
| def test_fixed_type() -> None: |
| avro_type = {"type": "fixed", "size": 22} |
| actual = AvroSchemaConversion()._convert_schema(avro_type) |
| expected = FixedType(22) |
| assert actual == expected |
| |
| |
| def test_unknown_primitive() -> None: |
| with pytest.raises(TypeError) as exc_info: |
| avro_type = "UnknownType" |
| AvroSchemaConversion()._convert_schema(avro_type) |
| assert "Unknown type: UnknownType" in str(exc_info.value) |
| |
| |
| def test_unknown_complex_type() -> None: |
| with pytest.raises(TypeError) as exc_info: |
| avro_type = { |
| "type": "UnknownType", |
| } |
| AvroSchemaConversion()._convert_schema(avro_type) |
| assert "Unknown type: {'type': 'UnknownType'}" in str(exc_info.value) |
| |
| |
| def test_convert_field_without_field_id() -> None: |
| with pytest.raises(ValueError) as exc_info: |
| avro_field = { |
| "name": "contains_null", |
| "type": "boolean", |
| } |
| AvroSchemaConversion()._convert_field(avro_field) |
| assert "Cannot convert field, missing field-id" in str(exc_info.value) |
| |
| |
| def test_convert_record_type_without_record() -> None: |
| with pytest.raises(ValueError) as exc_info: |
| avro_field = {"type": "non-record", "name": "avro_schema", "fields": []} |
| AvroSchemaConversion()._convert_record_type(avro_field) |
| assert "Expected record type, got" in str(exc_info.value) |
| |
| |
| def test_avro_list_missing_element_id() -> None: |
| avro_type = { |
| "name": "array_with_string", |
| "type": { |
| "type": "array", |
| "items": "string", |
| "default": [], |
| # "element-id": 101, |
| }, |
| "field-id": 100, |
| } |
| |
| with pytest.raises(ValueError) as exc_info: |
| AvroSchemaConversion()._convert_array_type(avro_type) |
| |
| assert "Cannot convert array-type, missing element-id:" in str(exc_info.value) |
| |
| |
| def test_convert_decimal_type() -> None: |
| avro_decimal_type = {"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 25} |
| actual = AvroSchemaConversion()._convert_logical_type(avro_decimal_type) |
| expected = DecimalType(precision=19, scale=25) |
| assert actual == expected |
| |
| |
| def test_convert_date_type() -> None: |
| avro_logical_type = {"type": "int", "logicalType": "date"} |
| actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type) |
| assert actual == DateType() |
| |
| |
| def test_unknown_logical_type() -> None: |
| """Test raising a ValueError when converting an unknown logical type as part of an Avro schema conversion""" |
| avro_logical_type = {"type": "bytes", "logicalType": "date"} |
| with pytest.raises(ValueError) as exc_info: |
| AvroSchemaConversion()._convert_logical_type(avro_logical_type) |
| |
| assert "Unknown logical/physical type combination:" in str(exc_info.value) |
| |
| |
| def test_logical_map_with_invalid_fields() -> None: |
| avro_type = { |
| "type": "array", |
| "logicalType": "map", |
| "items": { |
| "type": "record", |
| "name": "k101_v102", |
| "fields": [ |
| {"name": "key", "type": "int", "field-id": 101}, |
| {"name": "value", "type": "string", "field-id": 102}, |
| {"name": "other", "type": "bytes", "field-id": 103}, |
| ], |
| }, |
| } |
| |
| with pytest.raises(ValueError) as exc_info: |
| AvroSchemaConversion()._convert_logical_map_type(avro_type) |
| |
| assert "Invalid key-value pair schema:" in str(exc_info.value) |
| |
| |
| def test_iceberg_to_avro_manifest_list(avro_schema_manifest_file_v1: Dict[str, Any]) -> None: |
| """Round trip the manifest list""" |
| iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file_v1) |
| avro_result = AvroSchemaConversion().iceberg_to_avro(iceberg_schema, schema_name="manifest_file") |
| assert avro_schema_manifest_file_v1 == avro_result |
| |
| |
| def test_iceberg_to_avro_manifest(avro_schema_manifest_entry: Dict[str, Any]) -> None: |
| """Round trip the manifest itself""" |
| iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_entry) |
| avro_result = AvroSchemaConversion().iceberg_to_avro(iceberg_schema, schema_name="manifest_entry") |
| assert avro_schema_manifest_entry == avro_result |