blob: 6c97f393aea06ca9ba95428961305095f9b64e0d [file]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""Structured explain result for Daft Paimon scans."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from pypaimon.read.explain import ExplainResult
READER_MODE_NATIVE_PARQUET = "native_parquet"
READER_MODE_PYPAIMON_FALLBACK = "pypaimon_fallback"
@dataclass(frozen=True, slots=True)
class PaimonReaderSplitExplain:
partition: dict[str, Any]
bucket: int
file_count: int
row_count: int
file_size: int
reader_mode: str
fallback_reason: str | None
file_paths: list[str]
@dataclass(frozen=True, slots=True)
class PaimonScanExplain:
paimon_scan: ExplainResult
native_parquet_split_count: int = 0
native_parquet_file_count: int = 0
pypaimon_fallback_split_count: int = 0
pypaimon_fallback_file_count: int = 0
fallback_reasons: dict[str, int] = field(default_factory=dict)
pushed_filters: list[str] = field(default_factory=list)
remaining_filters: list[str] = field(default_factory=list)
partition_filters: list[str] = field(default_factory=list)
requested_columns: list[str] | None = None
task_columns: list[str] | None = None
fallback_read_columns: list[str] | None = None
requested_limit: int | None = None
source_limit: int | None = None
limit_pushed: bool = False
splits: list[PaimonReaderSplitExplain] | None = None
@property
def total_split_count(self) -> int:
return self.native_parquet_split_count + self.pypaimon_fallback_split_count
@property
def total_file_count(self) -> int:
return self.native_parquet_file_count + self.pypaimon_fallback_file_count
def __str__(self) -> str:
return render_daft_paimon_explain(self)
def render_daft_paimon_explain(result: PaimonScanExplain) -> str:
out = []
out.append("== Daft Paimon Scan ==")
_line(out, "Native Parquet splits", _count_files(
result.native_parquet_split_count,
result.native_parquet_file_count,
))
_line(out, "pypaimon fallback splits", _count_files(
result.pypaimon_fallback_split_count,
result.pypaimon_fallback_file_count,
))
_line(out, "Fallback reasons", _format_reason_counts(result.fallback_reasons))
_line(out, "Pushed filters", _format_list(result.pushed_filters))
_line(out, "Remaining filters", _format_list(result.remaining_filters))
_line(out, "Partition filters", _format_list(result.partition_filters))
_line(out, "Requested columns", _format_optional_list(result.requested_columns, "<all columns>"))
_line(out, "Task columns", _format_optional_list(result.task_columns, "<all columns>"))
_line(out, "Fallback read columns", _format_optional_list(
result.fallback_read_columns,
"<all columns>",
))
_line(out, "Limit", _format_limit(result))
if result.splits is not None:
out.append("")
out.append("Splits:")
for index, split in enumerate(result.splits):
suffix = "" if split.fallback_reason is None else " ({})".format(split.fallback_reason)
out.append(
" #{} bucket={} files={} rows={} size={} mode={}{}".format(
index,
split.bucket,
split.file_count,
split.row_count,
split.file_size,
split.reader_mode,
suffix,
)
)
out.append("")
out.append(str(result.paimon_scan).rstrip())
return "\n".join(out)
def _line(out: list[str], key: str, value: str) -> None:
out.append("{:<28} {}".format(key + ":", value))
def _count_files(split_count: int, file_count: int) -> str:
return "{} ({} files)".format(split_count, file_count)
def _format_reason_counts(reasons: dict[str, int]) -> str:
if not reasons:
return "<none>"
return ", ".join("{}: {}".format(reason, count) for reason, count in sorted(reasons.items()))
def _format_list(values: list[str]) -> str:
if not values:
return "<none>"
return ", ".join(values)
def _format_optional_list(values: list[str] | None, empty: str) -> str:
if values is None:
return empty
if not values:
return "[]"
return "[{}]".format(", ".join(values))
def _format_limit(result: PaimonScanExplain) -> str:
if result.requested_limit is None:
return "<none>"
pushed = "pushed" if result.limit_pushed else "not pushed"
source = "<none>" if result.source_limit is None else str(result.source_limit)
return "requested {}, source {} ({})".format(result.requested_limit, source, pushed)