blob: 7a4d47317a9eeeaacb67a5f3af032514d0f02f35 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=protected-access,unused-argument,redefined-outer-name
import math
import tempfile
import uuid
from dataclasses import asdict, dataclass
from datetime import (
date,
datetime,
time,
timedelta,
timezone,
)
from decimal import Decimal
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from pyiceberg.avro import (
STRUCT_BOOL,
STRUCT_DOUBLE,
STRUCT_FLOAT,
STRUCT_INT32,
STRUCT_INT64,
)
from pyiceberg.io.pyarrow import (
MetricModeTypes,
MetricsMode,
PyArrowStatisticsCollector,
compute_statistics_plan,
data_file_statistics_from_parquet_metadata,
match_metrics_mode,
parquet_path_to_id_mapping,
schema_to_pyarrow,
)
from pyiceberg.manifest import DataFile
from pyiceberg.schema import Schema, pre_order_visit
from pyiceberg.table.metadata import (
TableMetadata,
TableMetadataUtil,
TableMetadataV1,
TableMetadataV2,
)
from pyiceberg.types import (
BooleanType,
FloatType,
IntegerType,
StringType,
)
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros
@dataclass(frozen=True)
class TestStruct:
__test__ = False
x: Optional[int]
y: Optional[float]
def construct_test_table(
write_statistics: Union[bool, List[str]] = True,
) -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
table_metadata = {
"format-version": 2,
"location": "s3://bucket/test/location",
"last-column-id": 7,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "strings", "required": False, "type": "string"},
{"id": 2, "name": "floats", "required": False, "type": "float"},
{
"id": 3,
"name": "list",
"required": False,
"type": {"type": "list", "element-id": 6, "element": "long", "element-required": False},
},
{
"id": 4,
"name": "maps",
"required": False,
"type": {
"type": "map",
"key-id": 7,
"key": "long",
"value-id": 8,
"value": "long",
"value-required": False,
},
},
{
"id": 5,
"name": "structs",
"required": False,
"type": {
"type": "struct",
"fields": [
{"id": 9, "name": "x", "required": False, "type": "long"},
{"id": 10, "name": "y", "required": False, "type": "float", "doc": "comment"},
],
},
},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"properties": {},
}
table_metadata = TableMetadataUtil.parse_obj(table_metadata)
arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
_strings = ["zzzzzzzzzzzzzzzzzzzz", "rrrrrrrrrrrrrrrrrrrr", None, "aaaaaaaaaaaaaaaaaaaa"]
_floats = [3.14, math.nan, 1.69, 100]
_list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]
_maps: List[Optional[Dict[int, int]]] = [
{1: 2, 3: 4},
None,
{5: 6},
{},
]
_structs = [
asdict(TestStruct(1, 0.2)),
asdict(TestStruct(None, -1.34)),
None,
asdict(TestStruct(54, None)),
]
table = pa.Table.from_pydict(
{
"strings": _strings,
"floats": _floats,
"list": _list,
"maps": _maps,
"structs": _structs,
},
schema=arrow_schema,
)
metadata_collector: List[Any] = []
with pa.BufferOutputStream() as f:
with pq.ParquetWriter(
f, table.schema, metadata_collector=metadata_collector, write_statistics=write_statistics
) as writer:
writer.write_table(table)
return metadata_collector[0], table_metadata
def get_current_schema(
table_metadata: TableMetadata,
) -> Schema:
return next(filter(lambda s: s.schema_id == table_metadata.current_schema_id, table_metadata.schemas))
def test_record_count() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert datafile.record_count == 4
def test_value_counts() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 7
assert datafile.value_counts[1] == 4
assert datafile.value_counts[2] == 4
assert datafile.value_counts[6] == 10 # 3 lists with 3 items and a None value
assert datafile.value_counts[7] == 5
assert datafile.value_counts[8] == 5
assert datafile.value_counts[9] == 4
assert datafile.value_counts[10] == 4
def test_column_sizes() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.column_sizes) == 7
# these values are an artifact of how the write_table encodes the columns
assert datafile.column_sizes[1] > 0
assert datafile.column_sizes[2] > 0
assert datafile.column_sizes[6] > 0
assert datafile.column_sizes[7] > 0
assert datafile.column_sizes[8] > 0
def test_null_and_nan_counts() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.null_value_counts) == 7
assert datafile.null_value_counts[1] == 1
assert datafile.null_value_counts[2] == 0
assert datafile.null_value_counts[6] == 1
assert datafile.null_value_counts[7] == 2
assert datafile.null_value_counts[8] == 2
assert datafile.null_value_counts[9] == 2
assert datafile.null_value_counts[10] == 2
# #arrow does not include this in the statistics
# assert len(datafile.nan_value_counts) == 3
# assert datafile.nan_value_counts[1] == 0
# assert datafile.nan_value_counts[2] == 1
# assert datafile.nan_value_counts[3] == 0
def test_bounds() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.lower_bounds) == 2
assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaa"
assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)
assert len(datafile.upper_bounds) == 2
assert datafile.upper_bounds[1].decode() == "zzzzzzzzzzzzzzz{"
assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)
def test_metrics_mode_parsing() -> None:
assert match_metrics_mode("none") == MetricsMode(MetricModeTypes.NONE)
assert match_metrics_mode("nOnE") == MetricsMode(MetricModeTypes.NONE)
assert match_metrics_mode("counts") == MetricsMode(MetricModeTypes.COUNTS)
assert match_metrics_mode("Counts") == MetricsMode(MetricModeTypes.COUNTS)
assert match_metrics_mode("full") == MetricsMode(MetricModeTypes.FULL)
assert match_metrics_mode("FuLl") == MetricsMode(MetricModeTypes.FULL)
assert match_metrics_mode(" FuLl") == MetricsMode(MetricModeTypes.FULL)
assert match_metrics_mode("truncate(16)") == MetricsMode(MetricModeTypes.TRUNCATE, 16)
assert match_metrics_mode("trUncatE(16)") == MetricsMode(MetricModeTypes.TRUNCATE, 16)
assert match_metrics_mode("trUncatE(7)") == MetricsMode(MetricModeTypes.TRUNCATE, 7)
assert match_metrics_mode("trUncatE(07)") == MetricsMode(MetricModeTypes.TRUNCATE, 7)
with pytest.raises(ValueError) as exc_info:
match_metrics_mode("trUncatE(-7)")
assert "Malformed truncate: trUncatE(-7)" in str(exc_info.value)
with pytest.raises(ValueError) as exc_info:
match_metrics_mode("trUncatE(0)")
assert "Truncation length must be larger than 0" in str(exc_info.value)
def test_metrics_mode_none() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "none"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 0
assert len(datafile.null_value_counts) == 0
assert len(datafile.nan_value_counts) == 0
assert len(datafile.lower_bounds) == 0
assert len(datafile.upper_bounds) == 0
def test_metrics_mode_counts() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "counts"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 7
assert len(datafile.null_value_counts) == 7
assert len(datafile.nan_value_counts) == 0
assert len(datafile.lower_bounds) == 0
assert len(datafile.upper_bounds) == 0
def test_metrics_mode_full() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "full"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 7
assert len(datafile.null_value_counts) == 7
assert len(datafile.nan_value_counts) == 0
assert len(datafile.lower_bounds) == 2
assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaaaaaa"
assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)
assert len(datafile.upper_bounds) == 2
assert datafile.upper_bounds[1].decode() == "zzzzzzzzzzzzzzzzzzzz"
assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)
def test_metrics_mode_non_default_trunc() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 7
assert len(datafile.null_value_counts) == 7
assert len(datafile.nan_value_counts) == 0
assert len(datafile.lower_bounds) == 2
assert datafile.lower_bounds[1].decode() == "aa"
assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)
assert len(datafile.upper_bounds) == 2
assert datafile.upper_bounds[1].decode() == "z{"
assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)
def test_column_metrics_mode() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)"
table_metadata.properties["write.metadata.metrics.column.strings"] = "none"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 6
assert len(datafile.null_value_counts) == 6
assert len(datafile.nan_value_counts) == 0
assert len(datafile.lower_bounds) == 1
assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)
assert 1 not in datafile.lower_bounds
assert len(datafile.upper_bounds) == 1
assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)
assert 1 not in datafile.upper_bounds
def construct_test_table_primitive_types() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
table_metadata = {
"format-version": 2,
"location": "s3://bucket/test/location",
"last-column-id": 7,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "booleans", "required": False, "type": "boolean"},
{"id": 2, "name": "ints", "required": False, "type": "int"},
{"id": 3, "name": "longs", "required": False, "type": "long"},
{"id": 4, "name": "floats", "required": False, "type": "float"},
{"id": 5, "name": "doubles", "required": False, "type": "double"},
{"id": 6, "name": "dates", "required": False, "type": "date"},
{"id": 7, "name": "times", "required": False, "type": "time"},
{"id": 8, "name": "timestamps", "required": False, "type": "timestamp"},
{"id": 9, "name": "timestamptzs", "required": False, "type": "timestamptz"},
{"id": 10, "name": "strings", "required": False, "type": "string"},
{"id": 11, "name": "uuids", "required": False, "type": "uuid"},
{"id": 12, "name": "binaries", "required": False, "type": "binary"},
{"id": 13, "name": "decimal8", "required": False, "type": "decimal(5, 2)"},
{"id": 14, "name": "decimal16", "required": False, "type": "decimal(16, 6)"},
{"id": 15, "name": "decimal32", "required": False, "type": "decimal(19, 6)"},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"properties": {},
}
table_metadata = TableMetadataUtil.parse_obj(table_metadata)
arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
tz = timezone(timedelta(seconds=19800))
booleans = [True, False]
ints = [23, 89]
longs = [54, 2]
floats = [454.1223, 24342.29]
doubles = [8542.12, -43.9]
dates = [date(2022, 1, 2), date(2023, 2, 4)]
times = [time(17, 30, 34), time(13, 21, 4)]
timestamps = [datetime(2022, 1, 2, 17, 30, 34, 399), datetime(2023, 2, 4, 13, 21, 4, 354)]
timestamptzs = [datetime(2022, 1, 2, 17, 30, 34, 399, tz), datetime(2023, 2, 4, 13, 21, 4, 354, tz)]
strings = ["hello", "world"]
uuids = [uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes, uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes]
binaries = [b"hello", b"world"]
decimal8 = pa.array([Decimal("123.45"), Decimal("678.91")], pa.decimal128(8, 2))
decimal16 = pa.array([Decimal("12345679.123456"), Decimal("67891234.678912")], pa.decimal128(16, 6))
decimal32 = pa.array([Decimal("1234567890123.123456"), Decimal("9876543210703.654321")], pa.decimal128(19, 6))
table = pa.Table.from_pydict(
{
"booleans": booleans,
"ints": ints,
"longs": longs,
"floats": floats,
"doubles": doubles,
"dates": dates,
"times": times,
"timestamps": timestamps,
"timestamptzs": timestamptzs,
"strings": strings,
"uuids": uuids,
"binaries": binaries,
"decimal8": decimal8,
"decimal16": decimal16,
"decimal32": decimal32,
},
schema=arrow_schema,
)
metadata_collector: List[Any] = []
with pa.BufferOutputStream() as f:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector, store_decimal_as_integer=True) as writer:
writer.write_table(table)
return metadata_collector[0], table_metadata
def test_metrics_primitive_types() -> None:
metadata, table_metadata = construct_test_table_primitive_types()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 15
assert len(datafile.null_value_counts) == 15
assert len(datafile.nan_value_counts) == 0
tz = timezone(timedelta(seconds=19800))
assert len(datafile.lower_bounds) == 15
assert datafile.lower_bounds[1] == STRUCT_BOOL.pack(False)
assert datafile.lower_bounds[2] == STRUCT_INT32.pack(23)
assert datafile.lower_bounds[3] == STRUCT_INT64.pack(2)
assert datafile.lower_bounds[4] == STRUCT_FLOAT.pack(454.1223)
assert datafile.lower_bounds[5] == STRUCT_DOUBLE.pack(-43.9)
assert datafile.lower_bounds[6] == STRUCT_INT32.pack(date_to_days(date(2022, 1, 2)))
assert datafile.lower_bounds[7] == STRUCT_INT64.pack(time_to_micros(time(13, 21, 4)))
assert datafile.lower_bounds[8] == STRUCT_INT64.pack(datetime_to_micros(datetime(2022, 1, 2, 17, 30, 34, 399)))
assert datafile.lower_bounds[9] == STRUCT_INT64.pack(datetime_to_micros(datetime(2022, 1, 2, 17, 30, 34, 399, tz)))
assert datafile.lower_bounds[10] == b"he"
assert datafile.lower_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "foo").bytes
assert datafile.lower_bounds[12] == b"he"
assert datafile.lower_bounds[13][::-1].ljust(4, b"\x00") == STRUCT_INT32.pack(12345)
assert datafile.lower_bounds[14][::-1].ljust(8, b"\x00") == STRUCT_INT64.pack(12345679123456)
assert str(int.from_bytes(datafile.lower_bounds[15], byteorder="big", signed=True)).encode("utf-8") == b"1234567890123123456"
assert len(datafile.upper_bounds) == 15
assert datafile.upper_bounds[1] == STRUCT_BOOL.pack(True)
assert datafile.upper_bounds[2] == STRUCT_INT32.pack(89)
assert datafile.upper_bounds[3] == STRUCT_INT64.pack(54)
assert datafile.upper_bounds[4] == STRUCT_FLOAT.pack(24342.29)
assert datafile.upper_bounds[5] == STRUCT_DOUBLE.pack(8542.12)
assert datafile.upper_bounds[6] == STRUCT_INT32.pack(date_to_days(date(2023, 2, 4)))
assert datafile.upper_bounds[7] == STRUCT_INT64.pack(time_to_micros(time(17, 30, 34)))
assert datafile.upper_bounds[8] == STRUCT_INT64.pack(datetime_to_micros(datetime(2023, 2, 4, 13, 21, 4, 354)))
assert datafile.upper_bounds[9] == STRUCT_INT64.pack(datetime_to_micros(datetime(2023, 2, 4, 13, 21, 4, 354, tz)))
assert datafile.upper_bounds[10] == b"wp"
assert datafile.upper_bounds[11] == uuid.uuid3(uuid.NAMESPACE_DNS, "bar").bytes
assert datafile.upper_bounds[12] == b"wp"
assert datafile.upper_bounds[13][::-1].ljust(4, b"\x00") == STRUCT_INT32.pack(67891)
assert datafile.upper_bounds[14][::-1].ljust(8, b"\x00") == STRUCT_INT64.pack(67891234678912)
assert str(int.from_bytes(datafile.upper_bounds[15], byteorder="big", signed=True)).encode("utf-8") == b"9876543210703654321"
def construct_test_table_invalid_upper_bound() -> Tuple[pq.FileMetaData, Union[TableMetadataV1, TableMetadataV2]]:
table_metadata = {
"format-version": 2,
"location": "s3://bucket/test/location",
"last-column-id": 7,
"current-schema-id": 0,
"schemas": [
{
"type": "struct",
"schema-id": 0,
"fields": [
{"id": 1, "name": "valid_upper_binary", "required": False, "type": "binary"},
{"id": 2, "name": "invalid_upper_binary", "required": False, "type": "binary"},
{"id": 3, "name": "valid_upper_string", "required": False, "type": "string"},
{"id": 4, "name": "invalid_upper_string", "required": False, "type": "string"},
],
},
],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"properties": {},
}
table_metadata = TableMetadataUtil.parse_obj(table_metadata)
arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
valid_binaries = [b"\x00\x00\x00", b"\xff\xfe\x00"]
invalid_binaries = [b"\x00\x00\x00", b"\xff\xff\x00"]
valid_strings = ["\x00\x00\x00", "".join([chr(0x10FFFF), chr(0x10FFFE), chr(0x0)])]
invalid_strings = ["\x00\x00\x00", "".join([chr(0x10FFFF), chr(0x10FFFF), chr(0x0)])]
table = pa.Table.from_pydict(
{
"valid_upper_binary": valid_binaries,
"invalid_upper_binary": invalid_binaries,
"valid_upper_string": valid_strings,
"invalid_upper_string": invalid_strings,
},
schema=arrow_schema,
)
metadata_collector: List[Any] = []
with pa.BufferOutputStream() as f:
with pq.ParquetWriter(f, table.schema, metadata_collector=metadata_collector) as writer:
writer.write_table(table)
return metadata_collector[0], table_metadata
def test_metrics_invalid_upper_bound() -> None:
metadata, table_metadata = construct_test_table_invalid_upper_bound()
schema = get_current_schema(table_metadata)
table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)"
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert len(datafile.value_counts) == 4
assert len(datafile.null_value_counts) == 4
assert len(datafile.nan_value_counts) == 0
assert len(datafile.lower_bounds) == 4
assert datafile.lower_bounds[1] == b"\x00\x00"
assert datafile.lower_bounds[2] == b"\x00\x00"
assert datafile.lower_bounds[3] == b"\x00\x00"
assert datafile.lower_bounds[4] == b"\x00\x00"
assert len(datafile.upper_bounds) == 2
assert datafile.upper_bounds[1] == b"\xff\xff"
assert datafile.upper_bounds[3] == "".join([chr(0x10FFFF), chr(0x10FFFF)]).encode()
def test_offsets() -> None:
metadata, table_metadata = construct_test_table()
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
assert datafile.split_offsets is not None
assert len(datafile.split_offsets) == 1
assert datafile.split_offsets[0] == 4
def test_write_and_read_stats_schema(table_schema_nested: Schema) -> None:
tbl = pa.Table.from_pydict(
{
"foo": ["a", "b"],
"bar": [1, 2],
"baz": [False, True],
"qux": [["a", "b"], ["c", "d"]],
"quux": [[("a", (("aa", 1), ("ab", 2)))], [("b", (("ba", 3), ("bb", 4)))]],
"location": [[(52.377956, 4.897070), (4.897070, -122.431297)], [(43.618881, -116.215019), (41.881832, -87.623177)]],
"person": [("Fokko", 33), ("Max", 42)], # Possible data quality issue
},
schema=schema_to_pyarrow(table_schema_nested),
)
stats_columns = pre_order_visit(table_schema_nested, PyArrowStatisticsCollector(table_schema_nested, {}))
visited_paths = []
def file_visitor(written_file: Any) -> None:
visited_paths.append(written_file)
with tempfile.TemporaryDirectory() as tmpdir:
pq.write_to_dataset(tbl, tmpdir, file_visitor=file_visitor)
assert visited_paths[0].metadata.num_columns == len(stats_columns)
def test_stats_types(table_schema_nested: Schema) -> None:
stats_columns = pre_order_visit(table_schema_nested, PyArrowStatisticsCollector(table_schema_nested, {}))
# the field-ids should be sorted
assert all(stats_columns[i].field_id <= stats_columns[i + 1].field_id for i in range(len(stats_columns) - 1))
assert [col.iceberg_type for col in stats_columns] == [
StringType(),
IntegerType(),
BooleanType(),
StringType(),
StringType(),
StringType(),
IntegerType(),
FloatType(),
FloatType(),
StringType(),
IntegerType(),
]
def test_read_missing_statistics() -> None:
# write statistics for only for "strings" column
metadata, table_metadata = construct_test_table(write_statistics=["strings"])
# expect only "strings" column to have statistics in metadata
# and all other columns to have no statistics
for r in range(metadata.num_row_groups):
for pos in range(metadata.num_columns):
if metadata.row_group(r).column(pos).path_in_schema == "strings":
assert metadata.row_group(r).column(pos).is_stats_set is True
assert metadata.row_group(r).column(pos).statistics is not None
else:
assert metadata.row_group(r).column(pos).is_stats_set is False
assert metadata.row_group(r).column(pos).statistics is None
schema = get_current_schema(table_metadata)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
datafile = DataFile.from_args(**statistics.to_serialized_dict())
# expect only "strings" column values to be reflected in the
# upper_bound, lower_bound and null_value_counts props of datafile
string_col_idx = 1
assert len(datafile.lower_bounds) == 1
assert datafile.lower_bounds[string_col_idx].decode() == "aaaaaaaaaaaaaaaa"
assert len(datafile.upper_bounds) == 1
assert datafile.upper_bounds[string_col_idx].decode() == "zzzzzzzzzzzzzzz{"
assert len(datafile.null_value_counts) == 1
assert datafile.null_value_counts[string_col_idx] == 1
# This is commented out for now because write_to_dataset drops the partition
# columns making it harder to calculate the mapping from the column index to
# datatype id
#
# def test_dataset() -> pa.Buffer:
# table_metadata = {
# "format-version": 2,
# "location": "s3://bucket/test/location",
# "last-column-id": 7,
# "current-schema-id": 0,
# "schemas": [
# {
# "type": "struct",
# "schema-id": 0,
# "fields": [
# {"id": 1, "name": "ints", "required": False, "type": "long"},
# {"id": 2, "name": "even", "required": False, "type": "boolean"},
# ],
# },
# ],
# "default-spec-id": 0,
# "partition-specs": [{"spec-id": 0, "fields": []}],
# "properties": {},
# }
# table_metadata = TableMetadataUtil.parse_obj(table_metadata)
# schema = schema_to_pyarrow(table_metadata.schemas[0])
# _ints = [0, 2, 4, 8, 1, 3, 5, 7]
# parity = [True, True, True, True, False, False, False, False]
# table = pa.Table.from_pydict({"ints": _ints, "even": parity}, schema=schema)
# visited_paths = []
# def file_visitor(written_file: Any) -> None:
# visited_paths.append(written_file)
# with TemporaryDirectory() as tmpdir:
# pq.write_to_dataset(table, tmpdir, partition_cols=["even"], file_visitor=file_visitor)
# even = None
# odd = None
# assert len(visited_paths) == 2
# for written_file in visited_paths:
# df = DataFile()
# fill_parquet_file_metadata(df, written_file.metadata, written_file.size, table_metadata)
# if "even=true" in written_file.path:
# even = df
# if "even=false" in written_file.path:
# odd = df
# assert even is not None
# assert odd is not None
# assert len(even.value_counts) == 1
# assert even.value_counts[1] == 4
# assert len(even.lower_bounds) == 1
# assert even.lower_bounds[1] == STRUCT_INT64.pack(0)
# assert len(even.upper_bounds) == 1
# assert even.upper_bounds[1] == STRUCT_INT64.pack(8)
# assert len(odd.value_counts) == 1
# assert odd.value_counts[1] == 4
# assert len(odd.lower_bounds) == 1
# assert odd.lower_bounds[1] == STRUCT_INT64.pack(1)
# assert len(odd.upper_bounds) == 1
# assert odd.upper_bounds[1] == STRUCT_INT64.pack(7)