blob: 5b94060f40002698266f138d19ff17d6181bbe17 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=protected-access
import json
import pytest
from pyiceberg.avro.file import AvroFile
from pyiceberg.avro.reader import (
AvroStruct,
BinaryReader,
BooleanReader,
DateReader,
DecimalReader,
DoubleReader,
FixedReader,
FloatReader,
IntegerReader,
StringReader,
TimeReader,
TimestampReader,
TimestamptzReader,
UUIDReader,
primitive_reader,
)
from pyiceberg.manifest import _convert_pos_to_dict
from pyiceberg.schema import Schema
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
from tests.io.test_io import LocalInputFile
def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema):
with AvroFile(LocalInputFile(generated_manifest_entry_file)) as reader:
header = reader._read_header()
assert header.magic == b"Obj\x01"
assert json.loads(header.meta["avro.schema"]) == {
"type": "record",
"name": "manifest_entry",
"fields": [
{"field-id": 0, "name": "status", "type": "int"},
{"field-id": 1, "default": None, "name": "snapshot_id", "type": ["null", "long"]},
{
"field-id": 2,
"name": "data_file",
"type": {
"type": "record",
"name": "r2",
"fields": [
{"field-id": 100, "doc": "Location URI with FS scheme", "name": "file_path", "type": "string"},
{
"field-id": 101,
"doc": "File format name: avro, orc, or parquet",
"name": "file_format",
"type": "string",
},
{
"field-id": 102,
"name": "partition",
"type": {
"type": "record",
"name": "r102",
"fields": [{"field-id": 1000, "default": None, "name": "VendorID", "type": ["null", "int"]}],
},
},
{"field-id": 103, "doc": "Number of records in the file", "name": "record_count", "type": "long"},
{"field-id": 104, "doc": "Total file size in bytes", "name": "file_size_in_bytes", "type": "long"},
{"field-id": 105, "name": "block_size_in_bytes", "type": "long"},
{
"field-id": 108,
"doc": "Map of column id to total size on disk",
"default": None,
"name": "column_sizes",
"type": [
"null",
{
"logicalType": "map",
"type": "array",
"items": {
"type": "record",
"name": "k117_v118",
"fields": [
{"field-id": 117, "name": "key", "type": "int"},
{"field-id": 118, "name": "value", "type": "long"},
],
},
},
],
},
{
"field-id": 109,
"doc": "Map of column id to total count, including null and NaN",
"default": None,
"name": "value_counts",
"type": [
"null",
{
"logicalType": "map",
"type": "array",
"items": {
"type": "record",
"name": "k119_v120",
"fields": [
{"field-id": 119, "name": "key", "type": "int"},
{"field-id": 120, "name": "value", "type": "long"},
],
},
},
],
},
{
"field-id": 110,
"doc": "Map of column id to null value count",
"default": None,
"name": "null_value_counts",
"type": [
"null",
{
"logicalType": "map",
"type": "array",
"items": {
"type": "record",
"name": "k121_v122",
"fields": [
{"field-id": 121, "name": "key", "type": "int"},
{"field-id": 122, "name": "value", "type": "long"},
],
},
},
],
},
{
"field-id": 137,
"doc": "Map of column id to number of NaN values in the column",
"default": None,
"name": "nan_value_counts",
"type": [
"null",
{
"logicalType": "map",
"type": "array",
"items": {
"type": "record",
"name": "k138_v139",
"fields": [
{"field-id": 138, "name": "key", "type": "int"},
{"field-id": 139, "name": "value", "type": "long"},
],
},
},
],
},
{
"field-id": 125,
"doc": "Map of column id to lower bound",
"default": None,
"name": "lower_bounds",
"type": [
"null",
{
"logicalType": "map",
"type": "array",
"items": {
"type": "record",
"name": "k126_v127",
"fields": [
{"field-id": 126, "name": "key", "type": "int"},
{"field-id": 127, "name": "value", "type": "bytes"},
],
},
},
],
},
{
"field-id": 128,
"doc": "Map of column id to upper bound",
"default": None,
"name": "upper_bounds",
"type": [
"null",
{
"logicalType": "map",
"type": "array",
"items": {
"type": "record",
"name": "k129_v130",
"fields": [
{"field-id": 129, "name": "key", "type": "int"},
{"field-id": 130, "name": "value", "type": "bytes"},
],
},
},
],
},
{
"field-id": 131,
"doc": "Encryption key metadata blob",
"default": None,
"name": "key_metadata",
"type": ["null", "bytes"],
},
{
"field-id": 132,
"doc": "Splittable offsets",
"default": None,
"name": "split_offsets",
"type": ["null", {"element-id": 133, "type": "array", "items": "long"}],
},
{
"field-id": 140,
"doc": "Sort order ID",
"default": None,
"name": "sort_order_id",
"type": ["null", "int"],
},
],
},
},
],
}
assert header.get_schema() == iceberg_manifest_entry_schema
def test_read_manifest_entry_file(generated_manifest_entry_file: str):
with AvroFile(LocalInputFile(generated_manifest_entry_file)) as reader:
# Consume the generator
records = list(reader)
assert len(records) == 2, f"Expected 2 records, got {len(records)}"
assert records[0] == AvroStruct(
_data=[
1,
8744736658442914487,
AvroStruct(
_data=[
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
"PARQUET",
AvroStruct(_data=[None]),
19513,
388872,
67108864,
{
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
},
{
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
},
{
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
},
{16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
{
2: b"2020-04-01 00:00",
3: b"2020-04-01 00:12",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
},
{
2: b"2020-04-30 23:5:",
3: b"2020-05-01 00:41",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
},
None,
[4],
0,
]
),
]
)
def test_read_manifest_file_file(generated_manifest_file_file: str):
with AvroFile(LocalInputFile(generated_manifest_file_file)) as reader:
# Consume the generator
records = list(reader)
assert len(records) == 1, f"Expected 1 records, got {len(records)}"
actual = records[0]
expected = AvroStruct(
_data=[
actual.get(0),
7989,
0,
9182715666859759686,
3,
0,
0,
[AvroStruct(_data=[True, False, b"\x01\x00\x00\x00", b"\x02\x00\x00\x00"])],
237993,
0,
0,
]
)
assert actual == expected
def test_null_list_convert_pos_to_dict():
data = _convert_pos_to_dict(
Schema(
NestedField(name="field", field_id=1, field_type=ListType(element_id=2, element=StringType(), element_required=False))
),
AvroStruct([None]),
)
assert data["field"] is None
def test_null_dict_convert_pos_to_dict():
data = _convert_pos_to_dict(
Schema(
NestedField(
name="field",
field_id=1,
field_type=MapType(key_id=2, key_type=StringType(), value_id=3, value_type=StringType(), value_required=False),
)
),
AvroStruct([None]),
)
assert data["field"] is None
def test_null_struct_convert_pos_to_dict():
data = _convert_pos_to_dict(
Schema(
NestedField(
name="field",
field_id=1,
field_type=StructType(
NestedField(2, "required_field", StringType(), True), NestedField(3, "optional_field", IntegerType())
),
required=False,
)
),
AvroStruct([None]),
)
assert data["field"] is None
def test_fixed_reader():
assert primitive_reader(FixedType(22)) == FixedReader(22)
def test_decimal_reader():
assert primitive_reader(DecimalType(19, 25)) == DecimalReader(19, 25)
def test_boolean_reader():
assert primitive_reader(BooleanType()) == BooleanReader()
def test_integer_reader():
assert primitive_reader(IntegerType()) == IntegerReader()
def test_long_reader():
assert primitive_reader(LongType()) == IntegerReader()
def test_float_reader():
assert primitive_reader(FloatType()) == FloatReader()
def test_double_reader():
assert primitive_reader(DoubleType()) == DoubleReader()
def test_date_reader():
assert primitive_reader(DateType()) == DateReader()
def test_time_reader():
assert primitive_reader(TimeType()) == TimeReader()
def test_timestamp_reader():
assert primitive_reader(TimestampType()) == TimestampReader()
def test_timestamptz_reader():
assert primitive_reader(TimestamptzType()) == TimestamptzReader()
def test_string_reader():
assert primitive_reader(StringType()) == StringReader()
def test_binary_reader():
assert primitive_reader(BinaryType()) == BinaryReader()
def test_unknown_type():
class UnknownType(PrimitiveType):
__root__ = "UnknownType"
with pytest.raises(ValueError) as exc_info:
primitive_reader(UnknownType())
assert "Unknown type:" in str(exc_info.value)
def test_uuid_reader() -> None:
assert primitive_reader(UUIDType()) == UUIDReader()