blob: f0f5a569e6fd3e7c177d80a1d7eafa91fa48d42f [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Column projection utilities.
A projection maps a source row type to a flat list of ``DataField``: the
columns the user wants to read. Two flavours:
* :class:`TopLevelProjection` selects fields by their top-level index.
* :class:`NestedProjection` accepts paths that walk into ROW children, e.g.
``[[1, 0], [1, 2]]`` means "the 0th and 2nd children of the field at top
level index 1". The result is flattened into top-level fields whose
names are the underscore-joined original path (``a_b`` for ``a.b``,
with a ``__N`` suffix on collisions) and whose IDs are inherited from
the leaf so schema-evolution remapping by field ID still works.
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence
from pypaimon.schema.data_types import DataField, RowType
class Projection(ABC):
"""Abstract base for column projection."""
@abstractmethod
def project(self, row_type) -> List[DataField]:
"""Apply the projection and return the resulting flat fields."""
@abstractmethod
def is_nested(self) -> bool:
"""Whether any path goes deeper than the top level."""
@abstractmethod
def to_top_level_indexes(self) -> List[int]:
"""Top-level positions touched by this projection.
For nested projections, returns unique top-level indexes in path
order. Useful for fallback paths that can only push down at the
top level.
"""
@abstractmethod
def to_nested_indexes(self) -> List[List[int]]:
"""Return the projection as a list of paths, one per output field."""
@abstractmethod
def to_name_paths(self, row_type) -> List[List[str]]:
"""Translate integer paths to field-name paths against ``row_type``.
For a path ``[1, 0]`` against a row whose top-level field at index 1
is a struct ``mv_col`` with sub-fields ``[LATEST_VERSION, ...]``,
returns ``[["mv_col", "LATEST_VERSION"], ...]``. Used by format
readers to push nested projection down to the underlying engine
(e.g. PyArrow's ``ds.field(*name_path)``).
"""
# ------------------------------------------------------------------
# Factories
# ------------------------------------------------------------------
@staticmethod
def empty() -> "Projection":
"""The empty projection: no columns selected."""
return _EmptyProjection()
@staticmethod
def of(indexes_or_paths) -> "Projection":
"""Build a projection from either ``int[]`` or ``int[][]``.
Empty input returns :func:`empty`. The input must be uniformly
shaped — either all integers or all sequences of integers; mixing
the two raises ``TypeError`` so the failure is reported at the
``of`` call site rather than as an opaque error deep in
``project``.
"""
if not indexes_or_paths:
return _EmptyProjection()
first_is_path = isinstance(indexes_or_paths[0], (list, tuple))
for entry in indexes_or_paths[1:]:
entry_is_path = isinstance(entry, (list, tuple))
if entry_is_path != first_is_path:
raise TypeError(
"Projection.of expects either all top-level indexes "
"or all nested paths; got a mix")
if first_is_path:
return NestedProjection([list(p) for p in indexes_or_paths])
return TopLevelProjection(list(indexes_or_paths))
@staticmethod
def range(start_inclusive: int, end_exclusive: int) -> "Projection":
"""Top-level projection over a contiguous index range."""
if end_exclusive <= start_inclusive:
return _EmptyProjection()
return TopLevelProjection(list(range(start_inclusive, end_exclusive)))
class _EmptyProjection(Projection):
def project(self, row_type) -> List[DataField]:
return []
def is_nested(self) -> bool:
return False
def to_top_level_indexes(self) -> List[int]:
return []
def to_nested_indexes(self) -> List[List[int]]:
return []
def to_name_paths(self, row_type) -> List[List[str]]:
return []
class TopLevelProjection(Projection):
"""Single-level projection: pick fields by their top-level index."""
def __init__(self, indexes: Sequence[int]):
self.indexes = list(indexes)
def project(self, row_type) -> List[DataField]:
fields = _row_fields(row_type)
return [fields[i] for i in self.indexes]
def is_nested(self) -> bool:
return False
def to_top_level_indexes(self) -> List[int]:
return list(self.indexes)
def to_nested_indexes(self) -> List[List[int]]:
return [[i] for i in self.indexes]
def to_name_paths(self, row_type) -> List[List[str]]:
fields = _row_fields(row_type)
return [[fields[i].name] for i in self.indexes]
class NestedProjection(Projection):
"""Projection over paths that may walk into ROW children.
Each path navigates from a top-level field through successive ROW
children. A path of length 1 is equivalent to a top-level selection.
"""
def __init__(self, paths: Sequence[Sequence[int]]):
if not paths:
raise ValueError("NestedProjection requires at least one path")
self.paths = [list(p) for p in paths]
for p in self.paths:
if len(p) == 0:
raise ValueError(
"Each projection path must have at least one index")
self._has_nested = any(len(p) > 1 for p in self.paths)
def is_nested(self) -> bool:
return self._has_nested
def to_top_level_indexes(self) -> List[int]:
# Preserve order, deduplicate.
seen = set()
out: List[int] = []
for p in self.paths:
top = p[0]
if top not in seen:
seen.add(top)
out.append(top)
return out
def to_nested_indexes(self) -> List[List[int]]:
return [list(p) for p in self.paths]
def to_name_paths(self, row_type) -> List[List[str]]:
fields = _row_fields(row_type)
result: List[List[str]] = []
for path in self.paths:
field = fields[path[0]]
names = [field.name]
for idx in path[1:]:
child_type = field.type
if not is_row_type(child_type):
raise ValueError(
"Nested projection step expected a ROW type but got %s "
"for field '%s'" % (child_type, field.name))
child_fields = _row_fields(child_type)
field = child_fields[idx]
names.append(field.name)
result.append(names)
return result
def project(self, row_type) -> List[DataField]:
fields = _row_fields(row_type)
out: List[DataField] = []
seen_names = set()
dup_count = 0
for path in self.paths:
field = fields[path[0]]
name_parts = [field.name]
for idx in path[1:]:
child_type = field.type
if not is_row_type(child_type):
raise ValueError(
"Nested projection step expected a ROW type but got %s "
"for field '%s'" % (child_type, field.name))
child_fields = _row_fields(child_type)
field = child_fields[idx]
name_parts.append(field.name)
base_name = "_".join(name_parts)
final_name = base_name
while final_name in seen_names:
final_name = "%s__%d" % (base_name, dup_count)
dup_count += 1
seen_names.add(final_name)
# Keep the leaf field's ID so downstream schema-evolution
# remapping by field ID still works after rename.
out.append(DataField(
id=field.id,
name=final_name,
type=field.type,
description=getattr(field, 'description', None),
default_value=getattr(field, 'default_value', None),
))
return out
def _row_fields(row_type) -> List[DataField]:
"""Return the field list of a row-like type. Accepts a RowType, a plain
list of DataField, or anything else with a ``.fields`` attribute.
"""
if isinstance(row_type, list):
return row_type
fields: Optional[List[DataField]] = getattr(row_type, 'fields', None)
if fields is None:
raise ValueError(
"Projection target must be a RowType or have a .fields attribute, "
"got %s" % type(row_type).__name__)
return list(fields)
def is_row_type(data_type) -> bool:
if isinstance(data_type, RowType):
return True
return getattr(data_type, 'fields', None) is not None