| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint:disable=protected-access |
| import json |
| from typing import Callable |
| |
| import pytest |
| |
| from pyiceberg.avro.decoder import BinaryDecoder, StreamingBinaryDecoder |
| from pyiceberg.avro.decoder_fast import CythonBinaryDecoder |
| from pyiceberg.avro.file import AvroFile |
| from pyiceberg.avro.reader import ( |
| BinaryReader, |
| BooleanReader, |
| DateReader, |
| DecimalReader, |
| DoubleReader, |
| FixedReader, |
| FloatReader, |
| IntegerReader, |
| StringReader, |
| StructReader, |
| TimeReader, |
| TimestampNanoReader, |
| TimestampReader, |
| TimestamptzNanoReader, |
| TimestamptzReader, |
| UnknownReader, |
| UUIDReader, |
| ) |
| from pyiceberg.avro.resolver import construct_reader |
| from pyiceberg.io.pyarrow import PyArrowFileIO |
| from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMAS, DataFile, ManifestEntry |
| from pyiceberg.schema import Schema |
| from pyiceberg.typedef import Record |
| from pyiceberg.types import ( |
| BinaryType, |
| BooleanType, |
| DateType, |
| DecimalType, |
| DoubleType, |
| FixedType, |
| FloatType, |
| IntegerType, |
| LongType, |
| NestedField, |
| StringType, |
| StructType, |
| TimestampNanoType, |
| TimestampType, |
| TimestamptzNanoType, |
| TimestamptzType, |
| TimeType, |
| UnknownType, |
| UUIDType, |
| ) |
| |
| AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder] |
| |
| |
| def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None: |
| with AvroFile[ManifestEntry]( |
| PyArrowFileIO().new_input(generated_manifest_entry_file), |
| MANIFEST_ENTRY_SCHEMAS[2], |
| {-1: ManifestEntry, 2: DataFile}, |
| ) as reader: |
| header = reader.header |
| |
| assert header.magic == b"Obj\x01" |
| assert json.loads(header.meta["avro.schema"]) == { |
| "type": "record", |
| "name": "manifest_entry", |
| "fields": [ |
| {"field-id": 0, "name": "status", "type": "int"}, |
| {"field-id": 1, "default": None, "name": "snapshot_id", "type": ["null", "long"]}, |
| { |
| "field-id": 2, |
| "name": "data_file", |
| "type": { |
| "type": "record", |
| "name": "r2", |
| "fields": [ |
| {"field-id": 100, "doc": "Location URI with FS scheme", "name": "file_path", "type": "string"}, |
| { |
| "field-id": 101, |
| "doc": "File format name: avro, orc, or parquet", |
| "name": "file_format", |
| "type": "string", |
| }, |
| { |
| "field-id": 102, |
| "name": "partition", |
| "type": { |
| "type": "record", |
| "name": "r102", |
| "fields": [ |
| {"field-id": 1000, "default": None, "name": "VendorID", "type": ["null", "int"]}, |
| { |
| "field-id": 1001, |
| "default": None, |
| "name": "tpep_pickup_datetime", |
| "type": ["null", {"type": "int", "logicalType": "date"}], |
| }, |
| ], |
| }, |
| }, |
| {"field-id": 103, "doc": "Number of records in the file", "name": "record_count", "type": "long"}, |
| {"field-id": 104, "doc": "Total file size in bytes", "name": "file_size_in_bytes", "type": "long"}, |
| {"field-id": 105, "name": "block_size_in_bytes", "type": "long"}, |
| { |
| "field-id": 108, |
| "doc": "Map of column id to total size on disk", |
| "default": None, |
| "name": "column_sizes", |
| "type": [ |
| "null", |
| { |
| "logicalType": "map", |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "k117_v118", |
| "fields": [ |
| {"field-id": 117, "name": "key", "type": "int"}, |
| {"field-id": 118, "name": "value", "type": "long"}, |
| ], |
| }, |
| }, |
| ], |
| }, |
| { |
| "field-id": 109, |
| "doc": "Map of column id to total count, including null and NaN", |
| "default": None, |
| "name": "value_counts", |
| "type": [ |
| "null", |
| { |
| "logicalType": "map", |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "k119_v120", |
| "fields": [ |
| {"field-id": 119, "name": "key", "type": "int"}, |
| {"field-id": 120, "name": "value", "type": "long"}, |
| ], |
| }, |
| }, |
| ], |
| }, |
| { |
| "field-id": 110, |
| "doc": "Map of column id to null value count", |
| "default": None, |
| "name": "null_value_counts", |
| "type": [ |
| "null", |
| { |
| "logicalType": "map", |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "k121_v122", |
| "fields": [ |
| {"field-id": 121, "name": "key", "type": "int"}, |
| {"field-id": 122, "name": "value", "type": "long"}, |
| ], |
| }, |
| }, |
| ], |
| }, |
| { |
| "field-id": 137, |
| "doc": "Map of column id to number of NaN values in the column", |
| "default": None, |
| "name": "nan_value_counts", |
| "type": [ |
| "null", |
| { |
| "logicalType": "map", |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "k138_v139", |
| "fields": [ |
| {"field-id": 138, "name": "key", "type": "int"}, |
| {"field-id": 139, "name": "value", "type": "long"}, |
| ], |
| }, |
| }, |
| ], |
| }, |
| { |
| "field-id": 125, |
| "doc": "Map of column id to lower bound", |
| "default": None, |
| "name": "lower_bounds", |
| "type": [ |
| "null", |
| { |
| "logicalType": "map", |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "k126_v127", |
| "fields": [ |
| {"field-id": 126, "name": "key", "type": "int"}, |
| {"field-id": 127, "name": "value", "type": "bytes"}, |
| ], |
| }, |
| }, |
| ], |
| }, |
| { |
| "field-id": 128, |
| "doc": "Map of column id to upper bound", |
| "default": None, |
| "name": "upper_bounds", |
| "type": [ |
| "null", |
| { |
| "logicalType": "map", |
| "type": "array", |
| "items": { |
| "type": "record", |
| "name": "k129_v130", |
| "fields": [ |
| {"field-id": 129, "name": "key", "type": "int"}, |
| {"field-id": 130, "name": "value", "type": "bytes"}, |
| ], |
| }, |
| }, |
| ], |
| }, |
| { |
| "field-id": 131, |
| "doc": "Encryption key metadata blob", |
| "default": None, |
| "name": "key_metadata", |
| "type": ["null", "bytes"], |
| }, |
| { |
| "field-id": 132, |
| "doc": "Splittable offsets", |
| "default": None, |
| "name": "split_offsets", |
| "type": ["null", {"element-id": 133, "type": "array", "items": "long"}], |
| }, |
| { |
| "field-id": 140, |
| "doc": "Sort order ID", |
| "default": None, |
| "name": "sort_order_id", |
| "type": ["null", "int"], |
| }, |
| ], |
| }, |
| }, |
| ], |
| } |
| |
| assert header.get_schema() == iceberg_manifest_entry_schema |
| |
| |
| def test_fixed_reader() -> None: |
| assert construct_reader(FixedType(22)) == FixedReader(22) |
| |
| |
| def test_decimal_reader() -> None: |
| assert construct_reader(DecimalType(19, 25)) == DecimalReader(19, 25) |
| |
| |
| def test_boolean_reader() -> None: |
| assert construct_reader(BooleanType()) == BooleanReader() |
| |
| |
| def test_integer_reader() -> None: |
| assert construct_reader(IntegerType()) == IntegerReader() |
| |
| |
| def test_long_reader() -> None: |
| assert construct_reader(LongType()) == IntegerReader() |
| |
| |
| def test_float_reader() -> None: |
| assert construct_reader(FloatType()) == FloatReader() |
| |
| |
| def test_double_reader() -> None: |
| assert construct_reader(DoubleType()) == DoubleReader() |
| |
| |
| def test_date_reader() -> None: |
| assert construct_reader(DateType()) == DateReader() |
| |
| |
| def test_time_reader() -> None: |
| assert construct_reader(TimeType()) == TimeReader() |
| |
| |
| def test_timestamp_reader() -> None: |
| assert construct_reader(TimestampType()) == TimestampReader() |
| |
| |
| def test_timestamp_ns_reader() -> None: |
| assert construct_reader(TimestampNanoType()) == TimestampNanoReader() |
| |
| |
| def test_timestamptz_reader() -> None: |
| assert construct_reader(TimestamptzType()) == TimestamptzReader() |
| |
| |
| def test_timestamptz_ns_reader() -> None: |
| assert construct_reader(TimestamptzNanoType()) == TimestamptzNanoReader() |
| |
| |
| def test_string_reader() -> None: |
| assert construct_reader(StringType()) == StringReader() |
| |
| |
| def test_binary_reader() -> None: |
| assert construct_reader(BinaryType()) == BinaryReader() |
| |
| |
| def test_unknown_type() -> None: |
| assert construct_reader(UnknownType()) == UnknownReader() |
| |
| |
| def test_uuid_reader() -> None: |
| assert construct_reader(UUIDType()) == UUIDReader() |
| |
| |
| @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) |
| def test_read_struct(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: |
| decoder = decoder_class(b"\x18") |
| struct = StructType(NestedField(1, "id", IntegerType(), required=True)) |
| result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder) |
| assert repr(result) == "Record[12]" |
| |
| |
| @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) |
| def test_read_struct_lambda(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: |
| decoder = decoder_class(b"\x18") |
| |
| struct = StructType(NestedField(1, "id", IntegerType(), required=True)) |
| # You can also pass in an arbitrary function that returns a struct |
| result = StructReader( |
| ((0, IntegerReader()),), |
| Record, |
| struct, # pylint: disable=unnecessary-lambda |
| ).read(decoder) |
| assert repr(result) == "Record[12]" |
| |
| |
| @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) |
| def test_read_not_struct_type(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: |
| decoder = decoder_class(b"\x18") |
| |
| struct = StructType(NestedField(1, "id", IntegerType(), required=True)) |
| with pytest.raises(ValueError) as exc_info: |
| _ = StructReader(((0, IntegerReader()),), str, struct).read(decoder) # type: ignore |
| |
| assert "Incompatible with StructProtocol: <class 'str'>" in str(exc_info.value) |