blob: 7472b2bddcc340db76f4a9ed3789e97b9f9b91fd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from iceberg.api.expressions import Expression, Operation, Predicate
import pyarrow.dataset as ds
def get_dataset_filter(expr: Expression, expected_to_file_map: dict) -> ds.Expression:
"""
Given an Iceberg Expression and a mapping of names in the iceberg schema to the file schema,
convert to an equivalent dataset filter using the file column names. Recursively iterate through the
expressions to convert each portion one predicate at a time
Parameters
----------
expr : iceberg.api.expressions.Expression
An Iceberg Expression to be converted
expected_to_file_map : dict
A dict that maps the iceberg schema names to the names from the file schema
Returns
-------
pyarrow._dataset.Expression
An equivalent dataset expression
"""
if expr is None:
return None
if isinstance(expr, Predicate):
return predicate(expr, expected_to_file_map)
if expr.op() == Operation.TRUE:
return None
elif expr.op() == Operation.FALSE:
return False
elif expr.op() == Operation.NOT:
return not_(get_dataset_filter(expr.child, expected_to_file_map))
elif expr.op() == Operation.AND:
return and_(get_dataset_filter(expr.left, expected_to_file_map),
get_dataset_filter(expr.right, expected_to_file_map))
elif expr.op() == Operation.OR:
return or_(get_dataset_filter(expr.left, expected_to_file_map),
get_dataset_filter(expr.right, expected_to_file_map))
else:
raise RuntimeError("Unknown operation: {}".format(expr.op()))
def predicate(pred: Predicate, field_map: dict) -> ds.Expression: # noqa: ignore=C901
"""
Given an Iceberg Predicate and a mapping of names in the iceberg schema to the file schema,
convert to an equivalent dataset expression using the file column names.
Parameters
----------
pred : iceberg.api.expressions.Predicate
An Iceberg Predicate to be converted
field_map : dict
A dict that maps the iceberg schema names to the names from the file schema
Returns
-------
pyarrow._dataset.Expression
An equivalent dataset expression
"""
# get column name in the file schema so we can apply the predicate
col_name = field_map.get(pred.ref.name)
if col_name is None:
if pred.op == Operation.IS_NULL:
return ds.scalar(True) == ds.scalar(True)
return ds.scalar(True) == ds.scalar(False)
if pred.op == Operation.IS_NULL:
return ~ds.field(col_name).is_valid()
elif pred.op == Operation.NOT_NULL:
return ds.field(col_name).is_valid()
elif pred.op == Operation.LT:
return ds.field(col_name) < pred.lit.value
elif pred.op == Operation.LT_EQ:
return ds.field(col_name) <= pred.lit.value
elif pred.op == Operation.GT:
return ds.field(col_name) > pred.lit.value
elif pred.op == Operation.GT_EQ:
return ds.field(col_name) >= pred.lit.value
elif pred.op == Operation.EQ:
return ds.field(col_name) == pred.lit.value
elif pred.op == Operation.NOT_EQ:
return ds.field(col_name) != pred.lit.value
elif pred.op == Operation.IN:
return ds.field(col_name).isin(pred.lit.value)
elif pred.op == Operation.NOT_IN:
return ds.field(col_name).isin(pred.lit.value)
def and_(left: ds.Expression, right: ds.Expression) -> ds.Expression:
"""
Given a left and right expression combined them using the `AND` logical operator
Parameters
----------
left : pyarrow._dataset.Expression
A Dataset `Expression` to logically `AND`
right : pyarrow._dataset.Expression
A Dataset `Expression` to logically `AND`
Returns
-------
pyarrow._dataset.Expression
The left and right `Expression` combined with `AND`
"""
return left & right
def or_(left: ds.Expression, right: ds.Expression) -> ds.Expression:
"""
Given a left and right expression combined them using the `OR` logical operator
Parameters
----------
left : pyarrow._dataset.Expression
A Dataset `Expression` to logically `OR`
right : pyarrow._dataset.Expression
A Dataset `Expression` to logically `OR`
Returns
-------
pyarrow._dataset.Expression
The left and right `Expression` combined with `OR`
"""
return left | right
def not_(child: ds.Expression) -> ds.Expression:
"""
Given a child expression create the logical negation
Parameters
----------
child : pyarrow._dataset.Expression
A Dataset `Expression` to logically `OR`
Returns
-------
pyarrow._dataset.Expression
The negation of the input `Expression`
"""
return ~child