blob: 13db6d41571c1eddb413931ea8f338ecddc6a1f6 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class PruningStat:
"""Before / after counters for one pruning stage.
``before`` is the input size to the stage, ``after`` is the size that
survived. Either may be ``None`` when the stage did not run (for
example, ``bucket_pruning`` is ``None`` for tables that are not
HASH_FIXED with all bucket keys pinned).
"""
before: Optional[int]
after: Optional[int]
@property
def pruned(self) -> Optional[int]:
if self.before is None or self.after is None:
return None
return self.before - self.after
def format(self) -> str:
if self.before is None and self.after is None:
return "n/a"
before = "?" if self.before is None else str(self.before)
after = "?" if self.after is None else str(self.after)
pruned = self.pruned
suffix = "" if pruned is None else " (pruned {})".format(pruned)
return "{} -> {}{}".format(before, after, suffix)
@dataclass
class ExplainSplitInfo:
"""Per-split detail surfaced when ``explain(verbose=True)`` is used."""
partition: Dict[str, Any]
bucket: int
file_count: int
row_count: int
merged_row_count: Optional[int]
file_size: int
raw_convertible: bool
has_deletion_vectors: bool
level_histogram: Dict[int, int]
deletion_file_count: int
file_paths: List[str]
@dataclass
class ExplainResult:
"""Structured scan plan returned by ``ReadBuilder.explain()``.
The compact ``__str__`` shows enough signal to reason about cost: the
snapshot, the pushed-down predicate / projection / limit, the three
pruning before-after counters, split-level shape (raw-convertible
ratio, DV ratio, all-above-L0 ratio, files/split, size/split
distribution), and the file-level totals. ``verbose=True`` adds a
block listing each split.
"""
# Identity
table_identifier: str
is_primary_key_table: bool
bucket_mode: str
deletion_vectors_enabled: bool
data_evolution_enabled: bool
# Snapshot
snapshot_id: Optional[int]
schema_id: Optional[int]
# Pushdown
predicate: Optional[str] = None
projection: Optional[List[str]] = None
limit: Optional[int] = None
# Pruning (None when not applicable)
partition_pruning: Optional[PruningStat] = None
bucket_pruning: Optional[PruningStat] = None
file_skipping: Optional[PruningStat] = None
# File-level aggregates over final splits
file_count: int = 0
total_file_size: int = 0
estimated_row_count: int = 0
estimated_merged_row_count: Optional[int] = None
deletion_file_count: int = 0
level_histogram: Dict[int, int] = field(default_factory=dict)
# Split-level aggregates (shown in compact mode too)
split_count: int = 0
splits_raw_convertible: int = 0
splits_with_deletion_vectors: int = 0
splits_all_above_l0: int = 0
files_per_split_min: int = 0
files_per_split_max: int = 0
files_per_split_avg: float = 0.0
split_size_min: int = 0
split_size_max: int = 0
split_size_avg: float = 0.0
split_size_p50: int = 0
split_size_p95: int = 0
# Verbose-only
splits: Optional[List[ExplainSplitInfo]] = None
def __str__(self) -> str:
return render_explain(self)
# ---------------------------------------------------------------------------
# Pretty-print helpers
# ---------------------------------------------------------------------------
def render_explain(result: ExplainResult) -> str:
out = io.StringIO()
out.write("== PyPaimon Scan Plan ==\n")
flags = []
flags.append("PK" if result.is_primary_key_table else "Append")
flags.append(result.bucket_mode)
if result.deletion_vectors_enabled:
flags.append("dv=on")
if result.data_evolution_enabled:
flags.append("data-evolution=on")
_line(out, "Table", "{} ({})".format(result.table_identifier, ", ".join(flags)))
if result.snapshot_id is None:
_line(out, "Snapshot", "<none> (table is empty or has no snapshot)")
else:
schema_part = "" if result.schema_id is None else " (schema {})".format(result.schema_id)
_line(out, "Snapshot", "{}{}".format(result.snapshot_id, schema_part))
_line(out, "Predicate", result.predicate if result.predicate else "<none>")
_line(out, "Projection",
"[{}]".format(", ".join(result.projection)) if result.projection else "<all columns>")
_line(out, "Limit", str(result.limit) if result.limit is not None else "<none>")
out.write("\n")
_line(out, "Partition pruning",
result.partition_pruning.format() if result.partition_pruning else "n/a")
_line(out, "Bucket pruning",
result.bucket_pruning.format() if result.bucket_pruning else "n/a")
_line(out, "File skipping",
result.file_skipping.format() if result.file_skipping else "n/a")
out.write("\n")
_line(out, "Splits", str(result.split_count))
if result.split_count > 0:
out.write(" raw-convertible: {} / {}\n".format(
result.splits_raw_convertible, result.split_count))
out.write(" with DV: {} / {}\n".format(
result.splits_with_deletion_vectors, result.split_count))
out.write(" all-above-L0: {} / {}\n".format(
result.splits_all_above_l0, result.split_count))
out.write(" files/split: min={} max={} avg={:.2f}\n".format(
result.files_per_split_min,
result.files_per_split_max,
result.files_per_split_avg))
out.write(" size/split: min={} p50={} p95={} max={}\n".format(
_format_size(result.split_size_min),
_format_size(result.split_size_p50),
_format_size(result.split_size_p95),
_format_size(result.split_size_max)))
out.write("\n")
_line(out, "Files", str(result.file_count))
_line(out, "Total size", _format_size(result.total_file_size))
rows = "{:,}".format(result.estimated_row_count)
if result.estimated_merged_row_count is not None:
rows += " (merged: {:,})".format(result.estimated_merged_row_count)
_line(out, "Estimated rows", rows)
if result.level_histogram:
levels = sorted(result.level_histogram.items())
levels_str = " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)
_line(out, "Level histogram", levels_str)
_line(out, "Deletion files", str(result.deletion_file_count))
if result.splits:
out.write("\nSplits[]\n")
for idx, split in enumerate(result.splits):
out.write(" [{}] partition={} bucket={} files={} size={} rows={} raw={} dv={}\n".format(
idx,
split.partition,
split.bucket,
split.file_count,
_format_size(split.file_size),
split.row_count,
split.raw_convertible,
split.has_deletion_vectors,
))
if split.level_histogram:
levels = sorted(split.level_histogram.items())
out.write(" levels: {}\n".format(
" ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)))
for path in split.file_paths:
out.write(" file: {}\n".format(path))
return out.getvalue().rstrip("\n")
def _line(out: io.StringIO, label: str, value: str) -> None:
out.write("{:<19} {}\n".format(label + ":", value))
_SIZE_UNITS = ("B", "KiB", "MiB", "GiB", "TiB", "PiB")
def _format_size(num_bytes: int) -> str:
if num_bytes is None:
return "?"
size = float(num_bytes)
for unit in _SIZE_UNITS:
if size < 1024.0 or unit == _SIZE_UNITS[-1]:
if unit == "B":
return "{:d} {}".format(int(size), unit)
return "{:.1f} {}".format(size, unit)
size /= 1024.0
return "{:.1f} {}".format(size, _SIZE_UNITS[-1])