blob: ddd5e8c6c6f02eee903ce565781929f628a5ba0f [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""The ``$buckets`` system table — per-bucket aggregated stats."""
from typing import List, Optional
import pyarrow
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.schema.data_types import AtomicType, DataField, RowType
from pypaimon.table.system.system_table import SystemTable
TABLE_TYPE = RowType(False, [
DataField(0, "partition", AtomicType("STRING", nullable=True)),
DataField(1, "bucket", AtomicType("INT", nullable=False)),
DataField(2, "record_count", AtomicType("BIGINT", nullable=False)),
DataField(3, "file_size_in_bytes", AtomicType("BIGINT", nullable=False)),
DataField(4, "file_count", AtomicType("BIGINT", nullable=False)),
DataField(5, "last_update_time", AtomicType("TIMESTAMP(3)", nullable=True)),
])
_TIMESTAMP_TYPE = pyarrow.timestamp("ms")
class BucketsTable(SystemTable):
"""The ``$buckets`` system table.
Aggregates manifest entries by (partition, bucket) to show per-bucket
record counts, file sizes, file counts and last update times.
"""
def system_table_name(self) -> str:
return "buckets"
def row_type(self) -> RowType:
return TABLE_TYPE
def primary_keys(self) -> List[str]:
return ["partition", "bucket"]
def _build_arrow_table(self) -> pyarrow.Table:
snapshot = self.base_table.snapshot_manager().get_latest_snapshot()
if snapshot is None:
return self._empty_table()
manifest_list_manager = ManifestListManager(self.base_table)
manifest_files = manifest_list_manager.read_all(snapshot)
manifest_file_manager = ManifestFileManager(self.base_table)
entries = manifest_file_manager.read_entries_parallel(
manifest_files, drop_stats=True)
_NULL = object()
bucket_map: dict = {}
for entry in entries:
raw_key = tuple(
(field.name, _NULL if value is None else value)
for field, value in zip(
entry.partition.fields, entry.partition.values))
bucket_id = int(entry.bucket)
key = (raw_key, bucket_id)
stats = bucket_map.get(key)
if stats is None:
render_items = tuple(
(name, str(val) if val is not _NULL else None)
for name, val in raw_key)
stats = {
"render_items": render_items,
"bucket": bucket_id,
"record_count": 0,
"file_size_in_bytes": 0,
"file_count": 0,
"last_update_time": None,
}
bucket_map[key] = stats
stats["record_count"] += int(entry.file.row_count)
stats["file_size_in_bytes"] += int(entry.file.file_size)
stats["file_count"] += 1
ct_ms = entry.file.creation_time_epoch_millis()
if ct_ms is not None:
if (stats["last_update_time"] is None
or ct_ms > stats["last_update_time"]):
stats["last_update_time"] = ct_ms
sorted_keys = sorted(
bucket_map.keys(),
key=lambda k: (
_render_partition(bucket_map[k]["render_items"]) or "",
k[1]))
partition_strings: List[Optional[str]] = []
buckets: List[int] = []
record_counts: List[int] = []
file_sizes: List[int] = []
file_counts: List[int] = []
last_update_times: List[Optional[int]] = []
for key in sorted_keys:
stats = bucket_map[key]
partition_strings.append(_render_partition(stats["render_items"]))
buckets.append(stats["bucket"])
record_counts.append(stats["record_count"])
file_sizes.append(stats["file_size_in_bytes"])
file_counts.append(stats["file_count"])
last_update_times.append(stats["last_update_time"])
return pyarrow.table({
"partition": pyarrow.array(
partition_strings, type=pyarrow.string()),
"bucket": pyarrow.array(buckets, type=pyarrow.int32()),
"record_count": pyarrow.array(
record_counts, type=pyarrow.int64()),
"file_size_in_bytes": pyarrow.array(
file_sizes, type=pyarrow.int64()),
"file_count": pyarrow.array(file_counts, type=pyarrow.int64()),
"last_update_time": pyarrow.array(
last_update_times, type=_TIMESTAMP_TYPE),
})
@staticmethod
def _empty_table() -> pyarrow.Table:
return pyarrow.table({
"partition": pyarrow.array([], type=pyarrow.string()),
"bucket": pyarrow.array([], type=pyarrow.int32()),
"record_count": pyarrow.array([], type=pyarrow.int64()),
"file_size_in_bytes": pyarrow.array([], type=pyarrow.int64()),
"file_count": pyarrow.array([], type=pyarrow.int64()),
"last_update_time": pyarrow.array([], type=_TIMESTAMP_TYPE),
})
def _render_partition(spec_items) -> Optional[str]:
"""Render a partition spec as ``pt=v/pt2=v2`` or None when empty.
Null partition values are rendered as ``__NULL__`` to distinguish them
from the literal string ``"None"``. A partition whose value is
literally ``"__NULL__"`` will produce the same rendered string —
aggregation keys are still distinct, but the displayed partition
column will collide. This is a display-only limitation.
"""
if not spec_items:
return None
return "/".join(
"{}={}".format(name, "__NULL__" if value is None else value)
for name, value in spec_items)