blob: e7c8c547d1914bb02ee722f13ccede51b9d4c751 [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from __future__ import annotations
from dataclasses import dataclass, replace
import logging
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
import daft
from daft.dependencies import pa
from daft.expressions import ExpressionsProjection
from daft.io.partitioning import PartitionField
from daft.io.source import DataSource, DataSourceTask
from daft.logical.schema import Schema
from daft.recordbatch import RecordBatch
from pypaimon.daft.daft_compat import require_file_range_reads
from pypaimon.daft.daft_explain import (
PaimonReaderSplitExplain,
PaimonScanExplain,
READER_MODE_NATIVE_PARQUET,
READER_MODE_PYPAIMON_FALLBACK,
)
from pypaimon.daft.daft_predicate_visitor import convert_filters_to_paimon
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.explain import ExplainSplitInfo
from pypaimon.read.split import Split
from pypaimon.table.file_store_table import FileStoreTable
from daft.daft import PyExpr, StorageConfig
from daft.io.pushdowns import Pushdowns
logger = logging.getLogger(__name__)
PAIMON_FILE_FORMAT_PARQUET = "parquet"
PAIMON_FILE_FORMAT_ORC = "orc"
PAIMON_FILE_FORMAT_AVRO = "avro"
_PaimonIdentifier = tuple[str, str, str | None]
@dataclass(frozen=True, slots=True)
class _ReadPushdownState:
reader_predicate: Predicate | None
planning_predicate: Predicate | None
requested_columns: list[str] | None
task_columns: list[str] | None
read_columns: list[str] | None
source_limit: int | None
@dataclass(frozen=True, slots=True)
class _ReaderRouting:
reader_mode: str
fallback_reason: str | None
@property
def use_native_reader(self) -> bool:
return self.reader_mode == READER_MODE_NATIVE_PARQUET
def _options_to_dict(options: Any) -> dict[str, Any]:
if options is None:
return {}
if isinstance(options, dict):
return dict(options)
return dict(options.to_map())
def _extract_catalog_options(table: FileStoreTable) -> dict[str, Any]:
# Every FileIO exposes catalog properties via ``properties`` (CachingFileIO
# delegates to its wrapped FileIO), so no per-implementation handling needed.
return _options_to_dict(table.file_io.properties)
def _extract_identifier(table: FileStoreTable) -> _PaimonIdentifier | None:
identifier = table.identifier
if identifier is None:
return None
database_name = identifier.get_database_name()
table_name = identifier.get_table_name()
if database_name is None or table_name is None:
return None
return database_name, table_name, identifier.get_branch_name()
def _extract_table_options(table: FileStoreTable) -> dict[str, Any]:
return _options_to_dict(table.schema().options)
def _to_paimon_identifier(identifier: _PaimonIdentifier) -> Any:
database_name, table_name, branch_name = identifier
if branch_name:
from pypaimon.common.identifier import Identifier
return Identifier(database_name, table_name, branch_name)
return f"{database_name}.{table_name}"
def _load_table(
catalog_options: dict[str, Any],
table_identifier: _PaimonIdentifier | None,
table_path: str | None,
table_options: dict[str, Any],
) -> FileStoreTable:
if catalog_options and table_identifier is not None:
from pypaimon.catalog.catalog_factory import CatalogFactory
catalog = CatalogFactory.create(catalog_options)
table = catalog.get_table(_to_paimon_identifier(table_identifier))
elif table_path:
from pypaimon.table.file_store_table import FileStoreTable
table = FileStoreTable.from_path(table_path)
else:
raise RuntimeError(
"Unable to reconstruct Paimon table while deserializing PaimonDataSource."
)
if table_options:
table = table.copy(table_options)
return table
def _build_storage_config(
catalog_options: dict[str, Any],
multithreaded_io: bool,
) -> StorageConfig:
from daft import context
from daft.daft import StorageConfig
from pypaimon.daft.daft_io_config import _convert_paimon_catalog_options_to_io_config
io_config = _convert_paimon_catalog_options_to_io_config(catalog_options)
io_config = io_config or context.get_context().daft_planning_config.default_io_config
return StorageConfig(multithreaded_io, io_config)
class _PaimonPKSplitTask(DataSourceTask):
"""DataSourceTask for PK-table splits that require LSM-tree merge.
Used when split.raw_convertible is False (overlapping levels exist) or
when the file format is not Parquet (ORC, Avro). Delegates to pypaimon's
native reader which handles LSM merging internally.
"""
def __init__(
self,
table_catalog_options: dict[str, Any],
table_identifier: _PaimonIdentifier | None,
table_path: str | None,
table_options: dict[str, Any],
split: Split,
schema: Schema,
read_columns: list[str] | None = None,
limit: int | None = None,
predicate: Predicate | None = None,
output_columns: list[str] | None = None,
blob_column_names: set[str] | None = None,
) -> None:
self._table_catalog_options = table_catalog_options
self._table_identifier = table_identifier
self._table_path = table_path
self._table_options = table_options
self._split = split
self._schema = schema
self._read_columns = read_columns
self._limit = limit
self._predicate = predicate
self._output_columns = output_columns
self._blob_column_names = blob_column_names or set()
@property
def schema(self) -> Schema:
return self._schema
async def read(self) -> AsyncIterator[RecordBatch]:
table = _load_table(
self._table_catalog_options,
self._table_identifier,
self._table_path,
self._table_options,
)
read_builder = table.new_read_builder()
if self._read_columns is not None:
read_builder = read_builder.with_projection(self._read_columns)
if self._limit is not None:
read_builder = read_builder.with_limit(self._limit)
if self._predicate is not None:
read_builder = read_builder.with_filter(self._predicate)
reader = read_builder.new_read().to_arrow_batch_reader([self._split])
for batch in iter(reader.read_next_batch, None):
if self._output_columns is not None:
batch = batch.select(self._output_columns)
if self._blob_column_names:
batch = _convert_blob_columns(batch, self._blob_column_names)
rb = RecordBatch.from_arrow_record_batches([batch], batch.schema)
if self._blob_column_names:
rb = _cast_blob_columns_to_file(rb, self._blob_column_names)
yield rb
def _convert_blob_columns(batch: pa.RecordBatch, blob_column_names: set[str]) -> pa.RecordBatch:
"""Replace serialized BlobDescriptor columns with the File physical struct layout."""
from pypaimon.daft.daft_blob import FILE_PHYSICAL_TYPE, blob_column_to_file_array
arrays = []
fields = []
for i, field in enumerate(batch.schema):
col = batch.column(i)
if field.name in blob_column_names and (pa.types.is_large_binary(field.type) or pa.types.is_binary(field.type)):
arrays.append(blob_column_to_file_array(col))
fields.append(pa.field(field.name, FILE_PHYSICAL_TYPE, nullable=field.nullable))
else:
arrays.append(col)
fields.append(field)
return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields))
def _cast_blob_columns_to_file(rb: RecordBatch, blob_column_names: set[str]) -> RecordBatch:
"""Cast struct-typed blob columns in a RecordBatch to DataType.file()."""
from daft.datatype import DataType
file_dtype = DataType.file()
columns = {}
for i, field in enumerate(rb.schema()):
col = rb.get_column(i)
if field.name in blob_column_names:
col = col.cast(file_dtype)
columns[field.name] = col
return RecordBatch.from_pydict(columns)
class PaimonDataSource(DataSource):
"""DataSource for Apache Paimon tables.
Uses pypaimon for catalog metadata and scan planning (file listing,
partition pruning, statistics-based file skipping), then yields
DataSourceTask objects executed by Daft's native Parquet reader.
For primary-key tables whose splits cannot be read directly without an
LSM-tree merge, a _PaimonPKSplitTask is yielded which delegates back
to pypaimon's native reader.
"""
def __init__(
self,
table: FileStoreTable,
storage_config: StorageConfig,
catalog_options: dict[str, str],
) -> None:
self._storage_config = storage_config
self._catalog_options = dict(catalog_options or {})
self._table_catalog_options = {
**_extract_catalog_options(table),
**self._catalog_options,
}
self._table_identifier = _extract_identifier(table)
table_path = getattr(table, "table_path", None)
self._table_path = str(table_path) if table_path is not None else None
self._table_options = _extract_table_options(table)
self._paimon_predicate: Predicate | None = None
self._remaining_filters: list[PyExpr] | None = None
self._init_table(table)
def __getstate__(self) -> dict[str, Any]:
return {
"_multithreaded_io": self._storage_config.multithreaded_io,
"_catalog_options": self._catalog_options,
"_table_catalog_options": self._table_catalog_options,
"_table_identifier": self._table_identifier,
"_table_path": self._table_path,
"_table_options": self._table_options,
"_paimon_predicate": self._paimon_predicate,
"_remaining_filters": self._remaining_filters,
}
def __setstate__(self, state: dict[str, Any]) -> None:
self._catalog_options = state["_catalog_options"]
self._table_catalog_options = state["_table_catalog_options"]
self._table_identifier = state["_table_identifier"]
self._table_path = state["_table_path"]
self._table_options = state["_table_options"]
self._paimon_predicate = state["_paimon_predicate"]
self._remaining_filters = state["_remaining_filters"]
self._storage_config = _build_storage_config(
self._table_catalog_options,
state["_multithreaded_io"],
)
table = _load_table(
self._table_catalog_options,
self._table_identifier,
self._table_path,
self._table_options,
)
self._init_table(table)
def _init_table(self, table: FileStoreTable) -> None:
self._table = table
from pypaimon.schema.data_types import PyarrowFieldParser
pa_schema = PyarrowFieldParser.from_paimon_schema(table.fields)
self._blob_column_names: set[str] = {field.name for field in pa_schema if pa.types.is_large_binary(field.type)}
self._has_blob_columns = bool(self._blob_column_names)
if self._has_blob_columns:
require_file_range_reads()
from daft.datatype import DataType
base_schema = Schema.from_pyarrow_schema(pa_schema)
fields = []
for f in base_schema:
if f.name in self._blob_column_names:
fields.append((f.name, DataType.file()))
else:
fields.append((f.name, f.dtype))
self._schema = Schema.from_field_name_and_types(fields)
else:
self._schema = Schema.from_pyarrow_schema(pa_schema)
warehouse = (
self._catalog_options.get("warehouse")
or self._table_catalog_options.get("warehouse")
or ""
)
self._warehouse_scheme = urlparse(warehouse).scheme
self._file_format = table.options.file_format().lower()
self._is_parquet = self._file_format == PAIMON_FILE_FORMAT_PARQUET
self._partition_field_arrow_types: dict[str, pa.DataType] = (
{f.name: PyarrowFieldParser.from_paimon_type(f.type) for f in table.partition_keys_fields}
if table.partition_keys
else {}
)
self._pushed_filters: list[PyExpr] | None = None
self._paimon_predicate: Predicate | None = None
self._remaining_filters: list[PyExpr] | None = None
@property
def name(self) -> str:
table_path = getattr(self._table, "table_path", None)
return f"PaimonDataSource({table_path})"
@property
def schema(self) -> Schema:
return self._schema
def get_partition_fields(self) -> list[PartitionField]:
partition_key_names = set(self._table.partition_keys)
return [PartitionField.create(f) for f in self._schema if f.name in partition_key_names]
def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr], list[PyExpr]]:
"""Push filters down to Paimon scan.
Converts Daft expressions to Paimon predicates where possible.
Returns (pushed_filters, remaining_filters).
"""
pushed_filters, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, filters)
self._pushed_filters = pushed_filters
self._paimon_predicate = paimon_predicate
self._remaining_filters = remaining_filters
if pushed_filters:
logger.debug(
"Paimon filter pushdown: %d filters pushed, %d remaining",
len(pushed_filters),
len(remaining_filters),
)
return pushed_filters, remaining_filters
def _read_table_for_scan(self) -> FileStoreTable:
if self._has_blob_columns:
return self._table.copy({"blob-as-descriptor": "true"})
return self._table
def _scan_read_builder(
self,
table: FileStoreTable,
read_pushdowns: _ReadPushdownState,
) -> Any:
read_builder = table.new_read_builder()
if read_pushdowns.requested_columns is not None:
read_builder = read_builder.with_projection(read_pushdowns.requested_columns)
if read_pushdowns.source_limit is not None:
read_builder = read_builder.with_limit(read_pushdowns.source_limit)
if read_pushdowns.planning_predicate is not None:
read_builder = read_builder.with_filter(read_pushdowns.planning_predicate)
logger.debug(
"Applied Paimon filter pushdown predicate: %s",
read_pushdowns.planning_predicate,
)
return read_builder
async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]:
read_table = self._read_table_for_scan()
read_pushdowns = self._read_pushdown_state(read_table, pushdowns)
read_builder = self._scan_read_builder(read_table, read_pushdowns)
if self._table.partition_keys and pushdowns.partition_filters is None:
logger.warning(
"%s has partition keys %s but no partition filter was specified. "
"This will result in a full table scan.",
self.name,
list(self._table.partition_keys),
)
plan = read_builder.new_scan().plan()
pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None] = {}
for split in plan.splits():
if self._partition_filter_skips_split(split, pushdowns, pv_cache):
continue
routing = self._reader_routing(
raw_convertible=split.raw_convertible,
has_deletion_vectors=self._split_has_deletion_vectors(split),
)
if routing.use_native_reader:
pv = None
if self._table.partition_keys:
pv = self._partition_values(split, pv_cache)
for data_file in split.files:
file_uri = self._build_file_uri(self._data_file_path(data_file))
yield DataSourceTask.parquet(
path=file_uri,
schema=self._schema,
pushdowns=pushdowns,
num_rows=data_file.row_count,
size_bytes=data_file.file_size,
partition_values=pv,
storage_config=self._storage_config,
)
else:
logger.debug(
"Split with %d files using pypaimon fallback (%s).",
len(split.files),
routing.fallback_reason,
)
yield _PaimonPKSplitTask(
self._table_catalog_options,
self._table_identifier,
self._table_path,
_extract_table_options(read_table),
split,
self._project_schema(read_pushdowns.task_columns),
read_pushdowns.read_columns,
read_pushdowns.source_limit,
read_pushdowns.reader_predicate,
read_pushdowns.task_columns,
self._blob_column_names,
)
def explain_scan(self, pushdowns: Pushdowns, verbose: bool = False) -> PaimonScanExplain:
read_table = self._read_table_for_scan()
read_pushdowns = self._read_pushdown_state(read_table, pushdowns)
read_builder = self._scan_read_builder(read_table, read_pushdowns)
paimon_scan = read_builder.explain(verbose=True)
split_details = paimon_scan.splits or []
native_split_count = 0
native_file_count = 0
fallback_split_count = 0
fallback_file_count = 0
fallback_reasons: dict[str, int] = {}
explained_splits: list[PaimonReaderSplitExplain] | None = [] if verbose else None
pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None] = {}
for split in split_details:
if self._partition_filter_skips_explain_split(split, pushdowns, pv_cache):
continue
routing = self._reader_routing(
raw_convertible=split.raw_convertible,
has_deletion_vectors=split.has_deletion_vectors,
)
if routing.use_native_reader:
native_split_count += 1
native_file_count += split.file_count
else:
fallback_split_count += 1
fallback_file_count += split.file_count
reason = routing.fallback_reason or "unknown"
fallback_reasons[reason] = fallback_reasons.get(reason, 0) + 1
if explained_splits is not None:
explained_splits.append(
PaimonReaderSplitExplain(
partition=split.partition,
bucket=split.bucket,
file_count=split.file_count,
row_count=split.row_count,
file_size=split.file_size,
reader_mode=routing.reader_mode,
fallback_reason=routing.fallback_reason,
file_paths=split.file_paths,
)
)
if not verbose:
paimon_scan = replace(paimon_scan, splits=None)
pushed_filters, remaining_filters = self._filter_pushdown_explain(pushdowns)
return PaimonScanExplain(
paimon_scan=paimon_scan,
native_parquet_split_count=native_split_count,
native_parquet_file_count=native_file_count,
pypaimon_fallback_split_count=fallback_split_count,
pypaimon_fallback_file_count=fallback_file_count,
fallback_reasons=fallback_reasons,
pushed_filters=pushed_filters,
remaining_filters=remaining_filters,
partition_filters=self._format_partition_filters(pushdowns),
requested_columns=read_pushdowns.requested_columns,
task_columns=read_pushdowns.task_columns,
fallback_read_columns=read_pushdowns.read_columns,
requested_limit=pushdowns.limit,
source_limit=read_pushdowns.source_limit,
limit_pushed=pushdowns.limit is not None and read_pushdowns.source_limit == pushdowns.limit,
splits=explained_splits,
)
def _reader_routing(
self,
raw_convertible: bool,
has_deletion_vectors: bool,
) -> _ReaderRouting:
can_use_native_reader = (
self._is_parquet
and not self._has_blob_columns
and (not self._table.is_primary_key_table or raw_convertible)
and not has_deletion_vectors
)
if can_use_native_reader:
return _ReaderRouting(READER_MODE_NATIVE_PARQUET, None)
if not self._is_parquet:
reason = "non-parquet format"
elif self._has_blob_columns:
reason = "blob columns present"
elif has_deletion_vectors:
reason = "deletion vectors present"
else:
reason = "LSM merge required"
return _ReaderRouting(READER_MODE_PYPAIMON_FALLBACK, reason)
@staticmethod
def _split_has_deletion_vectors(split: Split) -> bool:
deletion_files = getattr(split, "data_deletion_files", None)
return deletion_files is not None and any(df is not None for df in deletion_files)
def _partition_filter_skips_split(
self,
split: Split,
pushdowns: Pushdowns,
pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None],
) -> bool:
if not self._table.partition_keys or pushdowns.partition_filters is None:
return False
pv = self._partition_values(split, pv_cache)
return self._partition_filter_skips_values(pv, pushdowns)
def _partition_filter_skips_explain_split(
self,
split: ExplainSplitInfo,
pushdowns: Pushdowns,
pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None],
) -> bool:
if not self._table.partition_keys or pushdowns.partition_filters is None:
return False
pv = self._partition_values_from_dict(split.partition, pv_cache)
return self._partition_filter_skips_values(pv, pushdowns)
@staticmethod
def _partition_filter_skips_values(
partition_values: RecordBatch | None,
pushdowns: Pushdowns,
) -> bool:
return (
partition_values is not None
and len(partition_values.filter(ExpressionsProjection([pushdowns.partition_filters]))) == 0
)
def _format_partition_filters(self, pushdowns: Pushdowns) -> list[str]:
if pushdowns.partition_filters is None:
return []
return self._format_pyexprs([getattr(pushdowns.partition_filters, "_expr", pushdowns.partition_filters)])
def _filter_pushdown_explain(self, pushdowns: Pushdowns) -> tuple[list[str], list[str]]:
if self._remaining_filters is not None:
return (
self._format_pyexprs(self._pushed_filters or []),
self._format_pyexprs(self._remaining_filters),
)
if pushdowns.filters is None:
return [], []
py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters)
pushed_filters, remaining_filters, _ = convert_filters_to_paimon(self._table, [py_expr])
return self._format_pyexprs(pushed_filters), self._format_pyexprs(remaining_filters)
@staticmethod
def _format_pyexprs(py_exprs: list[PyExpr]) -> list[str]:
from daft.expressions import Expression
result = []
for py_expr in py_exprs:
try:
result.append(str(Expression._from_pyexpr(py_expr)))
except Exception:
result.append(str(py_expr))
return result
def _build_file_uri(self, file_path: str) -> str:
"""Reconstruct a full URI from a (potentially scheme-stripped) file_path."""
if urlparse(file_path).scheme:
return file_path
if self._warehouse_scheme:
return f"{self._warehouse_scheme}://{file_path}"
return f"file://{file_path}"
@staticmethod
def _data_file_path(data_file: DataFileMeta) -> str:
return data_file.external_path if data_file.external_path else data_file.file_path
def _build_partition_values(self, split: Split) -> daft.recordbatch.RecordBatch | None:
"""Build a single-row RecordBatch encoding the partition values for a split."""
return self._build_partition_values_from_dict(split.partition.to_dict())
def _partition_values(
self,
split: Split,
pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None],
) -> RecordBatch | None:
return self._partition_values_from_dict(split.partition.to_dict(), pv_cache)
def _partition_values_from_dict(
self,
partition_dict: dict[str, Any],
pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None],
) -> RecordBatch | None:
pv_key = tuple(sorted(partition_dict.items()))
if pv_key not in pv_cache:
pv_cache[pv_key] = self._build_partition_values_from_dict(partition_dict)
return pv_cache[pv_key]
def _build_partition_values_from_dict(self, partition_dict: dict[str, Any]) -> daft.recordbatch.RecordBatch | None:
if not self._table.partition_keys:
return None
arrays: dict[str, daft.Series] = {}
for pfield in self._table.partition_keys_fields:
value = partition_dict.get(pfield.name)
arrow_type = self._partition_field_arrow_types[pfield.name]
arrays[pfield.name] = daft.Series.from_arrow(pa.array([value], type=arrow_type), name=pfield.name)
if not arrays:
return None
return daft.recordbatch.RecordBatch.from_pydict(arrays)
def _valid_output_columns(self, columns: list[str] | None) -> list[str] | None:
if columns is None:
return None
schema_names = {field.name for field in self._schema}
return [name for name in columns if name in schema_names]
def _task_columns(
self,
table: FileStoreTable,
output_columns: list[str] | None,
pushdowns: Pushdowns,
) -> list[str] | None:
if output_columns is None:
return None
task_columns = list(output_columns)
filter_required_column_names = getattr(pushdowns, "filter_required_column_names", None)
required_fields = filter_required_column_names() if filter_required_column_names else set()
return self._append_existing_columns(table, task_columns, required_fields)
def _fallback_read_columns(
self,
table: FileStoreTable,
task_columns: list[str] | None,
paimon_predicate: Predicate | None,
) -> list[str] | None:
if task_columns is None:
return None
read_columns = list(task_columns)
if paimon_predicate is not None:
from pypaimon.read.push_down_utils import _get_all_fields
return self._append_existing_columns(table, read_columns, _get_all_fields(paimon_predicate))
return read_columns
@staticmethod
def _append_existing_columns(
table: FileStoreTable,
columns: list[str],
required_fields: set[str],
) -> list[str]:
if not required_fields:
return columns
existing = set(columns)
columns.extend(
field.name
for field in table.fields
if field.name in required_fields and field.name not in existing
)
return columns
def _project_schema(self, columns: list[str] | None) -> Schema:
if columns is None:
return self._schema
field_map = {field.name: field for field in self._schema}
return Schema.from_field_name_and_types(
[(name, field_map[name].dtype) for name in columns if name in field_map]
)
def _read_pushdown_state(
self,
table: FileStoreTable,
pushdowns: Pushdowns,
) -> _ReadPushdownState:
reader_predicate, filters_consumed = self._pushdown_filter_state(pushdowns)
planning_predicate = self._planning_predicate(reader_predicate)
requested_columns = self._valid_output_columns(pushdowns.columns)
task_columns = self._task_columns(table, requested_columns, pushdowns)
read_columns = self._fallback_read_columns(table, task_columns, reader_predicate)
source_limit = self._source_limit(
pushdowns,
reader_predicate,
planning_predicate,
filters_consumed,
)
return _ReadPushdownState(
reader_predicate=reader_predicate,
planning_predicate=planning_predicate,
requested_columns=requested_columns,
task_columns=task_columns,
read_columns=read_columns,
source_limit=source_limit,
)
def _pushdown_filter_state(self, pushdowns: Pushdowns) -> tuple[Predicate | None, bool]:
if self._remaining_filters is not None:
return self._paimon_predicate, not self._remaining_filters
if pushdowns.filters is None:
return None, True
py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters)
_, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, [py_expr])
return paimon_predicate, not remaining_filters
def _planning_predicate(self, pushdown_predicate: Predicate | None) -> Predicate | None:
if pushdown_predicate is None:
return None
if not self._can_plan_predicate(pushdown_predicate):
return None
if self._paimon_predicate is not None or self._requires_fallback_reader():
return pushdown_predicate
return None
@staticmethod
def _source_limit(
pushdowns: Pushdowns,
reader_predicate: Predicate | None,
planning_predicate: Predicate | None,
filters_consumed: bool,
) -> int | None:
if pushdowns.limit is None:
return None
if pushdowns.partition_filters is not None:
return None
if not filters_consumed:
return None
if reader_predicate is not None and planning_predicate is None:
return None
return pushdowns.limit
def _requires_fallback_reader(self) -> bool:
return not self._is_parquet or self._has_blob_columns or self._table.is_primary_key_table
def _can_plan_predicate(self, predicate: Predicate) -> bool:
# Missing value null-count stats make isNull unsafe for scan planning.
if not self._predicate_contains_is_null(predicate):
return True
return self._table.is_primary_key_table and not self._table.options.deletion_vectors_enabled()
def _predicate_contains_is_null(self, predicate: Predicate) -> bool:
if predicate.method == "isNull":
return True
if predicate.method in ("and", "or"):
return any(self._predicate_contains_is_null(child) for child in predicate.literals or [])
return False