blob: 9d5772d01c88f5e99c62fa9b9637f7beffd90e80 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=protected-access,unused-argument,redefined-outer-name
import re
from typing import Any
import pyarrow as pa
import pytest
from pyiceberg.expressions import (
And,
BoundEqualTo,
BoundGreaterThan,
BoundIsNaN,
BoundIsNull,
BoundReference,
Not,
Or,
)
from pyiceberg.expressions.literals import literal
from pyiceberg.io.pyarrow import (
UnsupportedPyArrowTypeException,
_ConvertToArrowSchema,
_ConvertToIceberg,
_ConvertToIcebergWithoutIDs,
_expression_to_complementary_pyarrow,
_HasIds,
_NullNaNUnmentionedTermsCollector,
_pyarrow_schema_ensure_large_types,
_pyarrow_schema_ensure_small_types,
pyarrow_to_schema,
schema_to_pyarrow,
visit_pyarrow,
)
from pyiceberg.schema import Accessor, Schema, visit
from pyiceberg.table.name_mapping import MappedField, NameMapping
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IcebergType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
)
def test_pyarrow_binary_to_iceberg() -> None:
length = 23
pyarrow_type = pa.binary(length)
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == FixedType(length)
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_decimal128_to_iceberg() -> None:
precision = 26
scale = 20
pyarrow_type = pa.decimal128(precision, scale)
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == DecimalType(precision, scale)
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_decimal256_to_iceberg() -> None:
precision = 26
scale = 20
pyarrow_type = pa.decimal256(precision, scale)
with pytest.raises(TypeError, match=re.escape("Unsupported type: decimal256(26, 20)")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
def test_pyarrow_boolean_to_iceberg() -> None:
pyarrow_type = pa.bool_()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == BooleanType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_int8_to_iceberg() -> None:
pyarrow_type = pa.int8()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == IntegerType()
def test_pyarrow_int16_to_iceberg() -> None:
pyarrow_type = pa.int16()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == IntegerType()
def test_pyarrow_int32_to_iceberg() -> None:
pyarrow_type = pa.int32()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == IntegerType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_int64_to_iceberg() -> None:
pyarrow_type = pa.int64()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == LongType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_float32_to_iceberg() -> None:
pyarrow_type = pa.float32()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == FloatType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_float64_to_iceberg() -> None:
pyarrow_type = pa.float64()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == DoubleType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_date32_to_iceberg() -> None:
pyarrow_type = pa.date32()
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == DateType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_date64_to_iceberg() -> None:
pyarrow_type = pa.date64()
with pytest.raises(TypeError, match=re.escape("Unsupported type: date64")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
def test_pyarrow_time32_to_iceberg() -> None:
pyarrow_type = pa.time32("ms")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[ms]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.time32("s")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[s]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
def test_pyarrow_time64_us_to_iceberg() -> None:
pyarrow_type = pa.time64("us")
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == TimeType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_time64_ns_to_iceberg() -> None:
pyarrow_type = pa.time64("ns")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
@pytest.mark.parametrize("precision", ["s", "ms", "us", "ns"])
def test_pyarrow_timestamp_to_iceberg(precision: str) -> None:
pyarrow_type = pa.timestamp(unit=precision)
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg(downcast_ns_timestamp_to_us=True))
assert converted_iceberg_type == TimestampType()
# all timestamp types are converted to 'us' precision
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us")
def test_pyarrow_timestamp_invalid_units() -> None:
pyarrow_type = pa.timestamp(unit="ns")
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
def test_pyarrow_timestamp_tz_to_iceberg() -> None:
pyarrow_type = pa.timestamp(unit="us", tz="UTC")
pyarrow_type_zero_offset = pa.timestamp(unit="us", tz="+00:00")
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
converted_iceberg_type_zero_offset = visit_pyarrow(pyarrow_type_zero_offset, _ConvertToIceberg())
assert converted_iceberg_type == TimestamptzType()
assert converted_iceberg_type_zero_offset == TimestamptzType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type
assert visit(converted_iceberg_type_zero_offset, _ConvertToArrowSchema()) == pyarrow_type
def test_pyarrow_timestamp_tz_invalid_units() -> None:
pyarrow_type = pa.timestamp(unit="ns", tz="UTC")
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
def test_pyarrow_timestamp_tz_invalid_tz() -> None:
pyarrow_type = pa.timestamp(unit="us", tz="US/Pacific")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[us, tz=US/Pacific]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
@pytest.mark.parametrize("pyarrow_type", [pa.string(), pa.large_string(), pa.string_view()])
def test_pyarrow_string_to_iceberg(pyarrow_type: pa.DataType) -> None:
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == StringType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.large_string()
@pytest.mark.parametrize("pyarrow_type", [pa.binary(), pa.large_binary(), pa.binary_view()])
def test_pyarrow_variable_binary_to_iceberg(pyarrow_type: pa.DataType) -> None:
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == BinaryType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.large_binary()
def test_pyarrow_struct_to_iceberg() -> None:
pyarrow_struct = pa.struct(
[
pa.field("foo", pa.string(), nullable=True, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}),
pa.field("baz", pa.bool_(), nullable=True, metadata={"PARQUET:field_id": "3"}),
]
)
expected = StructType(
NestedField(field_id=1, name="foo", field_type=StringType(), required=False, doc="foo doc"),
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
)
assert visit_pyarrow(pyarrow_struct, _ConvertToIceberg()) == expected
def test_pyarrow_list_to_iceberg() -> None:
pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}))
expected = ListType(
element_id=1,
element_type=IntegerType(),
element_required=True,
)
assert visit_pyarrow(pyarrow_list, _ConvertToIceberg()) == expected
def test_pyarrow_large_list_to_iceberg() -> None:
pyarrow_list = pa.large_list(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}))
expected = ListType(
element_id=1,
element_type=IntegerType(),
element_required=True,
)
assert visit_pyarrow(pyarrow_list, _ConvertToIceberg()) == expected
def test_pyarrow_fixed_size_list_to_iceberg() -> None:
pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}), 1)
expected = ListType(
element_id=1,
element_type=IntegerType(),
element_required=True,
)
assert visit_pyarrow(pyarrow_list, _ConvertToIceberg()) == expected
def test_pyarrow_map_to_iceberg() -> None:
pyarrow_map = pa.map_(
pa.field("key", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}),
pa.field("value", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
)
expected = MapType(
key_id=1,
key_type=IntegerType(),
value_id=2,
value_type=StringType(),
value_required=True,
)
assert visit_pyarrow(pyarrow_map, _ConvertToIceberg()) == expected
@pytest.mark.parametrize(
"value_type, expected_result",
[
(pa.string(), StringType()),
(pa.int32(), IntegerType()),
(pa.float64(), DoubleType()),
],
)
def test_pyarrow_dictionary_encoded_type_to_iceberg(value_type: pa.DataType, expected_result: IcebergType) -> None:
pyarrow_dict = pa.dictionary(pa.int32(), value_type)
assert visit_pyarrow(pyarrow_dict, _ConvertToIceberg()) == expected_result
def test_round_schema_conversion_simple(table_schema_simple: Schema) -> None:
actual = str(pyarrow_to_schema(schema_to_pyarrow(table_schema_simple)))
expected = """table {
1: foo: optional string
2: bar: required int
3: baz: optional boolean
}"""
assert actual == expected
def test_round_schema_conversion_nested(table_schema_nested: Schema) -> None:
actual = str(pyarrow_to_schema(schema_to_pyarrow(table_schema_nested)))
expected = """table {
1: foo: optional string
2: bar: required int
3: baz: optional boolean
4: qux: required list<string>
6: quux: required map<string, map<string, int>>
11: location: required list<struct<13: latitude: optional float, 14: longitude: optional float>>
15: person: optional struct<16: name: optional string, 17: age: required int>
}"""
assert actual == expected
def test_round_schema_large_string() -> None:
schema = pa.schema([pa.field("animals", pa.large_string())])
actual = str(pyarrow_to_schema(schema, name_mapping=NameMapping([MappedField(field_id=1, names=["animals"])])))
expected = """table {
1: animals: optional string
}"""
assert actual == expected
def test_simple_schema_has_missing_ids() -> None:
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False),
]
)
visitor = _HasIds()
has_ids = visit_pyarrow(schema, visitor)
assert not has_ids
def test_simple_schema_has_missing_ids_partial() -> None:
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
pa.field("bar", pa.int32(), nullable=False),
]
)
visitor = _HasIds()
has_ids = visit_pyarrow(schema, visitor)
assert not has_ids
def test_nested_schema_has_missing_ids() -> None:
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False),
pa.field(
"quux",
pa.map_(
pa.string(),
pa.map_(pa.string(), pa.int32()),
),
nullable=False,
),
]
)
visitor = _HasIds()
has_ids = visit_pyarrow(schema, visitor)
assert not has_ids
def test_nested_schema_has_ids() -> None:
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
pa.field(
"quux",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "7"}),
pa.field(
"value",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "9"}),
pa.field("value", pa.int32(), metadata={"PARQUET:field_id": "10"}),
),
nullable=False,
metadata={"PARQUET:field_id": "8"},
),
),
nullable=False,
metadata={"PARQUET:field_id": "6", "doc": "quux doc"},
),
]
)
visitor = _HasIds()
has_ids = visit_pyarrow(schema, visitor)
assert has_ids
def test_nested_schema_has_partial_missing_ids() -> None:
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}),
pa.field(
"quux",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "7"}),
pa.field(
"value",
pa.map_(pa.field("key", pa.string(), nullable=False), pa.field("value", pa.int32())),
nullable=False,
),
),
nullable=False,
metadata={"PARQUET:field_id": "6", "doc": "quux doc"},
),
]
)
visitor = _HasIds()
has_ids = visit_pyarrow(schema, visitor)
assert not has_ids
def test_pyarrow_schema_to_schema_missing_ids_and_name_mapping(pyarrow_schema_simple_without_ids: pa.Schema) -> None:
schema = pyarrow_schema_simple_without_ids
with pytest.raises(ValueError) as exc_info:
_ = pyarrow_to_schema(schema)
assert (
"Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
in str(exc_info.value)
)
def test_simple_pyarrow_schema_to_schema_missing_ids_using_name_mapping(
pyarrow_schema_simple_without_ids: pa.Schema, iceberg_schema_simple: Schema
) -> None:
schema = pyarrow_schema_simple_without_ids
name_mapping = NameMapping(
[
MappedField(field_id=1, names=["foo"]),
MappedField(field_id=2, names=["bar"]),
MappedField(field_id=3, names=["baz"]),
]
)
assert pyarrow_to_schema(schema, name_mapping) == iceberg_schema_simple
def test_simple_pyarrow_schema_to_schema_missing_ids_using_name_mapping_partial_exception(
pyarrow_schema_simple_without_ids: pa.Schema,
) -> None:
schema = pyarrow_schema_simple_without_ids
name_mapping = NameMapping(
[
MappedField(field_id=1, names=["foo"]),
]
)
with pytest.raises(ValueError) as exc_info:
_ = pyarrow_to_schema(schema, name_mapping)
assert "Could not find field with name: bar" in str(exc_info.value)
def test_nested_pyarrow_schema_to_schema_missing_ids_using_name_mapping(
pyarrow_schema_nested_without_ids: pa.Schema, iceberg_schema_nested: Schema
) -> None:
schema = pyarrow_schema_nested_without_ids
name_mapping = NameMapping(
[
MappedField(field_id=1, names=["foo"]),
MappedField(field_id=2, names=["bar"]),
MappedField(field_id=3, names=["baz"]),
MappedField(field_id=4, names=["qux"], fields=[MappedField(field_id=5, names=["element"])]),
MappedField(
field_id=6,
names=["quux"],
fields=[
MappedField(field_id=7, names=["key"]),
MappedField(
field_id=8,
names=["value"],
fields=[
MappedField(field_id=9, names=["key"]),
MappedField(field_id=10, names=["value"]),
],
),
],
),
MappedField(
field_id=11,
names=["location"],
fields=[
MappedField(
field_id=12,
names=["element"],
fields=[
MappedField(field_id=13, names=["latitude"]),
MappedField(field_id=14, names=["longitude"]),
],
)
],
),
MappedField(
field_id=15,
names=["person"],
fields=[
MappedField(field_id=16, names=["name"]),
MappedField(field_id=17, names=["age"]),
],
),
]
)
assert pyarrow_to_schema(schema, name_mapping) == iceberg_schema_nested
def test_pyarrow_schema_to_schema_missing_ids_using_name_mapping_nested_missing_id() -> None:
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False),
pa.field(
"quux",
pa.map_(
pa.string(),
pa.map_(pa.string(), pa.int32()),
),
nullable=False,
),
]
)
name_mapping = NameMapping(
[
MappedField(field_id=1, names=["foo"]),
MappedField(
field_id=6,
names=["quux"],
fields=[
MappedField(field_id=7, names=["key"]),
MappedField(
field_id=8,
names=["value"],
fields=[
MappedField(field_id=10, names=["value"]),
],
),
],
),
]
)
with pytest.raises(ValueError) as exc_info:
_ = pyarrow_to_schema(schema, name_mapping)
assert "Could not find field with name: quux.value.key" in str(exc_info.value)
def test_pyarrow_schema_to_schema_fresh_ids_simple_schema(
pyarrow_schema_simple_without_ids: pa.Schema, iceberg_schema_simple_no_ids: Schema
) -> None:
assert visit_pyarrow(pyarrow_schema_simple_without_ids, _ConvertToIcebergWithoutIDs()) == iceberg_schema_simple_no_ids
def test_pyarrow_schema_to_schema_fresh_ids_nested_schema(
pyarrow_schema_nested_without_ids: pa.Schema, iceberg_schema_nested_no_ids: Schema
) -> None:
assert visit_pyarrow(pyarrow_schema_nested_without_ids, _ConvertToIcebergWithoutIDs()) == iceberg_schema_nested_no_ids
def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
expected_schema = pa.schema(
[
pa.field("foo", pa.large_string(), nullable=False),
pa.field("bar", pa.int32(), nullable=False),
pa.field("baz", pa.bool_(), nullable=True),
pa.field("qux", pa.large_list(pa.large_string()), nullable=False),
pa.field(
"quux",
pa.map_(
pa.large_string(),
pa.map_(pa.large_string(), pa.int32()),
),
nullable=False,
),
pa.field(
"location",
pa.large_list(
pa.struct(
[
pa.field("latitude", pa.float32(), nullable=False),
pa.field("longitude", pa.float32(), nullable=False),
]
),
),
nullable=False,
),
pa.field(
"person",
pa.struct(
[
pa.field("name", pa.large_string(), nullable=True),
pa.field("age", pa.int32(), nullable=False),
]
),
nullable=True,
),
]
)
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema
def test_pyarrow_schema_unsupported_type() -> None:
unsupported_field = pa.field("latitude", pa.decimal256(20, 26), nullable=False, metadata={"PARQUET:field_id": "2"})
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}),
pa.field(
"location",
pa.large_list(
pa.field(
"item",
pa.struct(
[
unsupported_field,
pa.field("longitude", pa.float32(), nullable=False, metadata={"PARQUET:field_id": "3"}),
]
),
metadata={"PARQUET:field_id": "4"},
)
),
nullable=False,
metadata={"PARQUET:field_id": "5"},
),
],
metadata={"PARQUET:field_id": "6"},
)
with pytest.raises(
UnsupportedPyArrowTypeException, match=re.escape("Column 'latitude' has an unsupported type: decimal256(20, 26)")
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Unsupported type: decimal256(20, 26)" in exception_cause.args[0]
unsupported_field = pa.field(
"quux",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
pa.field(
"value",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "5"}),
pa.field("value", pa.decimal256(2, 3), metadata={"PARQUET:field_id": "6"}),
),
nullable=False,
metadata={"PARQUET:field_id": "4"},
),
),
nullable=False,
metadata={"PARQUET:field_id": "3"},
)
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}),
unsupported_field,
]
)
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape("Column 'quux' has an unsupported type: map<string, map<string, decimal256(2, 3)>>"),
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Unsupported type: decimal256(2, 3)" in exception_cause.args[0]
unsupported_field = pa.field("foo", pa.timestamp(unit="ns"), nullable=False, metadata={"PARQUET:field_id": "1"})
schema = pa.schema(
[
unsupported_field,
pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}),
]
)
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape("Column 'foo' has an unsupported type: timestamp[ns]"),
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Iceberg does not yet support 'ns' timestamp precision" in exception_cause.args[0]
def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
schema_with_large_types = _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids)
assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids
@pytest.fixture
def bound_reference_str() -> BoundReference[Any]:
return BoundReference(
field=NestedField(1, "string_field", StringType(), required=False), accessor=Accessor(position=0, inner=None)
)
@pytest.fixture
def bound_reference_float() -> BoundReference[Any]:
return BoundReference(
field=NestedField(2, "float_field", FloatType(), required=False), accessor=Accessor(position=1, inner=None)
)
@pytest.fixture
def bound_reference_double() -> BoundReference[Any]:
return BoundReference(
field=NestedField(3, "double_field", DoubleType(), required=False),
accessor=Accessor(position=2, inner=None),
)
@pytest.fixture
def bound_eq_str_field(bound_reference_str: BoundReference[Any]) -> BoundEqualTo[Any]:
return BoundEqualTo(term=bound_reference_str, literal=literal("hello"))
@pytest.fixture
def bound_greater_than_float_field(bound_reference_float: BoundReference[Any]) -> BoundGreaterThan[Any]:
return BoundGreaterThan(term=bound_reference_float, literal=literal(100))
@pytest.fixture
def bound_is_nan_float_field(bound_reference_float: BoundReference[Any]) -> BoundIsNaN[Any]:
return BoundIsNaN(bound_reference_float)
@pytest.fixture
def bound_eq_double_field(bound_reference_double: BoundReference[Any]) -> BoundEqualTo[Any]:
return BoundEqualTo(term=bound_reference_double, literal=literal(False))
@pytest.fixture
def bound_is_null_double_field(bound_reference_double: BoundReference[Any]) -> BoundIsNull[Any]:
return BoundIsNull(bound_reference_double)
def test_collect_null_nan_unmentioned_terms(
bound_eq_str_field: BoundEqualTo[Any], bound_is_nan_float_field: BoundIsNaN[Any], bound_is_null_double_field: BoundIsNull[Any]
) -> None:
bound_expr = And(
Or(And(bound_eq_str_field, bound_is_nan_float_field), bound_is_null_double_field), Not(bound_is_nan_float_field)
)
collector = _NullNaNUnmentionedTermsCollector()
collector.collect(bound_expr)
assert {t.ref().field.name for t in collector.null_unmentioned_bound_terms} == {
"float_field",
"string_field",
}
assert {t.ref().field.name for t in collector.nan_unmentioned_bound_terms} == {
"string_field",
"double_field",
}
assert {t.ref().field.name for t in collector.is_null_or_not_bound_terms} == {
"double_field",
}
assert {t.ref().field.name for t in collector.is_nan_or_not_bound_terms} == {"float_field"}
def test_collect_null_nan_unmentioned_terms_with_multiple_predicates_on_the_same_term(
bound_eq_str_field: BoundEqualTo[Any],
bound_greater_than_float_field: BoundGreaterThan[Any],
bound_is_nan_float_field: BoundIsNaN[Any],
bound_eq_double_field: BoundEqualTo[Any],
bound_is_null_double_field: BoundIsNull[Any],
) -> None:
"""Test a single term appears multiple places in the expression tree"""
bound_expr = And(
Or(
And(bound_eq_str_field, bound_greater_than_float_field),
And(bound_is_nan_float_field, bound_eq_double_field),
bound_greater_than_float_field,
),
Not(bound_is_null_double_field),
)
collector = _NullNaNUnmentionedTermsCollector()
collector.collect(bound_expr)
assert {t.ref().field.name for t in collector.null_unmentioned_bound_terms} == {
"float_field",
"string_field",
}
assert {t.ref().field.name for t in collector.nan_unmentioned_bound_terms} == {
"string_field",
"double_field",
}
assert {t.ref().field.name for t in collector.is_null_or_not_bound_terms} == {
"double_field",
}
assert {t.ref().field.name for t in collector.is_nan_or_not_bound_terms} == {"float_field"}
def test_expression_to_complementary_pyarrow(
bound_eq_str_field: BoundEqualTo[Any],
bound_greater_than_float_field: BoundGreaterThan[Any],
bound_is_nan_float_field: BoundIsNaN[Any],
bound_eq_double_field: BoundEqualTo[Any],
bound_is_null_double_field: BoundIsNull[Any],
) -> None:
bound_expr = And(
Or(
And(bound_eq_str_field, bound_greater_than_float_field),
And(bound_is_nan_float_field, bound_eq_double_field),
bound_greater_than_float_field,
),
Not(bound_is_null_double_field),
)
result = _expression_to_complementary_pyarrow(bound_expr)
# Notice an isNan predicate on a str column is automatically converted to always false and removed from Or and thus will not appear in the pc.expr.
assert (
repr(result)
== """<pyarrow.compute.Expression (((invert(((((string_field == "hello") and (float_field > 100)) or ((is_nan(float_field) and (double_field == 0)) or (float_field > 100))) and invert(is_null(double_field, {nan_is_null=false})))) or is_null(float_field, {nan_is_null=false})) or is_null(string_field, {nan_is_null=false})) or is_nan(double_field))>"""
)