blob: 1b3bb4bb106fe51a132463103c4fbad34edd9394 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""The ``$files`` system table — per-data-file detail of the latest snapshot."""
import json
from typing import Any, List, Optional
import pyarrow
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
RowType)
from pypaimon.table.system.system_table import SystemTable
TABLE_TYPE = RowType(False, [
DataField(0, "partition", AtomicType("STRING", nullable=True)),
DataField(1, "bucket", AtomicType("INT", nullable=False)),
DataField(2, "file_path", AtomicType("STRING", nullable=False)),
DataField(3, "file_format", AtomicType("STRING", nullable=False)),
DataField(4, "schema_id", AtomicType("BIGINT", nullable=False)),
DataField(5, "level", AtomicType("INT", nullable=False)),
DataField(6, "record_count", AtomicType("BIGINT", nullable=False)),
DataField(7, "file_size_in_bytes", AtomicType("BIGINT", nullable=False)),
DataField(8, "min_key", AtomicType("STRING", nullable=True)),
DataField(9, "max_key", AtomicType("STRING", nullable=True)),
DataField(10, "null_value_counts", AtomicType("STRING", nullable=False)),
DataField(11, "min_value_stats", AtomicType("STRING", nullable=False)),
DataField(12, "max_value_stats", AtomicType("STRING", nullable=False)),
DataField(13, "min_sequence_number", AtomicType("BIGINT", nullable=True)),
DataField(14, "max_sequence_number", AtomicType("BIGINT", nullable=True)),
DataField(15, "creation_time", AtomicType("TIMESTAMP(3)", nullable=True)),
# ``deleteRowCount`` is intentionally camelCase to keep the on-wire
# column name stable.
DataField(16, "deleteRowCount", AtomicType("BIGINT", nullable=True)),
DataField(17, "file_source", AtomicType("STRING", nullable=True)),
DataField(18, "first_row_id", AtomicType("BIGINT", nullable=True)),
DataField(
19,
"write_cols",
ArrayType(nullable=True,
element_type=AtomicType("STRING", nullable=True))),
])
_TIMESTAMP_TYPE = pyarrow.timestamp("ms")
_WRITE_COLS_TYPE = pyarrow.list_(pyarrow.string())
def _to_json(obj: Any) -> str:
return json.dumps(obj, separators=(',', ':'), ensure_ascii=False,
default=str)
def _stringify_path(value: Any) -> str:
if value is None:
return ""
if isinstance(value, bytes):
try:
return value.decode("utf-8")
except UnicodeDecodeError:
return value.hex()
return str(value)
def _stats_columns(file_meta, table_field_names: List[str]) -> List[str]:
cols = getattr(file_meta, "value_stats_cols", None)
if cols:
return list(cols)
return list(table_field_names)
def _to_python(value: Any) -> Any:
"""Render an internal-row cell value into a JSON-safe primitive."""
if value is None:
return None
if isinstance(value, bytes):
try:
return value.decode("utf-8")
except UnicodeDecodeError:
return value.hex()
if hasattr(value, "isoformat"):
try:
return value.isoformat()
except Exception:
return str(value)
if isinstance(value, (int, float, bool, str, list, dict)):
return value
return str(value)
def _render_key(row) -> Optional[str]:
if row is None:
return None
values = getattr(row, "values", None)
if not values:
return None
return _to_json([_to_python(v) for v in values])
def _render_partition(partition_row) -> Optional[str]:
if partition_row is None:
return None
fields = getattr(partition_row, "fields", None) or []
values = getattr(partition_row, "values", None) or []
if not fields:
return None
return "/".join("{}={}".format(field.name, value)
for field, value in zip(fields, values))
def _render_stats_map(values: List[Any], columns: List[str]) -> str:
pairs = {}
n = min(len(columns), len(values) if values is not None else 0)
for i in range(n):
pairs[columns[i]] = _to_python(values[i])
return _to_json(pairs)
def _render_null_counts(null_counts: Optional[List[int]],
columns: List[str]) -> str:
pairs = {}
if null_counts:
n = min(len(columns), len(null_counts))
for i in range(n):
pairs[columns[i]] = (None if null_counts[i] is None
else int(null_counts[i]))
return _to_json(pairs)
class FilesTable(SystemTable):
"""The ``$files`` system table."""
def system_table_name(self) -> str:
return "files"
def row_type(self) -> RowType:
return TABLE_TYPE
def primary_keys(self) -> List[str]:
return ["file_path"]
def _build_arrow_table(self) -> pyarrow.Table:
snapshot = self.base_table.snapshot_manager().get_latest_snapshot()
if snapshot is None:
return self._empty_table()
manifest_list_manager = ManifestListManager(self.base_table)
manifest_files = manifest_list_manager.read_all(snapshot)
manifest_file_manager = ManifestFileManager(self.base_table)
entries = manifest_file_manager.read_entries_parallel(
manifest_files, drop_stats=False)
file_format = self.base_table.options.file_format()
table_field_names = list(self.base_table.field_names)
rows = {
"partition": [],
"bucket": [],
"file_path": [],
"file_format": [],
"schema_id": [],
"level": [],
"record_count": [],
"file_size_in_bytes": [],
"min_key": [],
"max_key": [],
"null_value_counts": [],
"min_value_stats": [],
"max_value_stats": [],
"min_sequence_number": [],
"max_sequence_number": [],
"creation_time": [],
"deleteRowCount": [],
"file_source": [],
"first_row_id": [],
"write_cols": [],
}
for entry in entries:
meta = entry.file
rows["partition"].append(_render_partition(entry.partition))
rows["bucket"].append(int(entry.bucket))
rows["file_path"].append(_stringify_path(
meta.file_path or meta.file_name))
rows["file_format"].append(file_format)
rows["schema_id"].append(int(meta.schema_id))
rows["level"].append(int(meta.level))
rows["record_count"].append(int(meta.row_count))
rows["file_size_in_bytes"].append(int(meta.file_size))
rows["min_key"].append(_render_key(meta.min_key))
rows["max_key"].append(_render_key(meta.max_key))
stats_cols = _stats_columns(meta, table_field_names)
value_stats = meta.value_stats
rows["null_value_counts"].append(
_render_null_counts(value_stats.null_counts, stats_cols))
rows["min_value_stats"].append(_render_stats_map(
getattr(value_stats.min_values, "values", []) or [],
stats_cols))
rows["max_value_stats"].append(_render_stats_map(
getattr(value_stats.max_values, "values", []) or [],
stats_cols))
rows["min_sequence_number"].append(int(meta.min_sequence_number))
rows["max_sequence_number"].append(int(meta.max_sequence_number))
rows["creation_time"].append(meta.creation_time_epoch_millis())
rows["deleteRowCount"].append(
None if meta.delete_row_count is None
else int(meta.delete_row_count))
rows["file_source"].append(
None if meta.file_source is None else str(meta.file_source))
rows["first_row_id"].append(
None if meta.first_row_id is None
else int(meta.first_row_id))
rows["write_cols"].append(
list(meta.write_cols) if meta.write_cols else None)
return pyarrow.table({
"partition": pyarrow.array(
rows["partition"], type=pyarrow.string()),
"bucket": pyarrow.array(rows["bucket"], type=pyarrow.int32()),
"file_path": pyarrow.array(
rows["file_path"], type=pyarrow.string()),
"file_format": pyarrow.array(
rows["file_format"], type=pyarrow.string()),
"schema_id": pyarrow.array(
rows["schema_id"], type=pyarrow.int64()),
"level": pyarrow.array(rows["level"], type=pyarrow.int32()),
"record_count": pyarrow.array(
rows["record_count"], type=pyarrow.int64()),
"file_size_in_bytes": pyarrow.array(
rows["file_size_in_bytes"], type=pyarrow.int64()),
"min_key": pyarrow.array(rows["min_key"], type=pyarrow.string()),
"max_key": pyarrow.array(rows["max_key"], type=pyarrow.string()),
"null_value_counts": pyarrow.array(
rows["null_value_counts"], type=pyarrow.string()),
"min_value_stats": pyarrow.array(
rows["min_value_stats"], type=pyarrow.string()),
"max_value_stats": pyarrow.array(
rows["max_value_stats"], type=pyarrow.string()),
"min_sequence_number": pyarrow.array(
rows["min_sequence_number"], type=pyarrow.int64()),
"max_sequence_number": pyarrow.array(
rows["max_sequence_number"], type=pyarrow.int64()),
"creation_time": pyarrow.array(
rows["creation_time"], type=_TIMESTAMP_TYPE),
"deleteRowCount": pyarrow.array(
rows["deleteRowCount"], type=pyarrow.int64()),
"file_source": pyarrow.array(
rows["file_source"], type=pyarrow.string()),
"first_row_id": pyarrow.array(
rows["first_row_id"], type=pyarrow.int64()),
"write_cols": pyarrow.array(
rows["write_cols"], type=_WRITE_COLS_TYPE),
})
@staticmethod
def _empty_table() -> pyarrow.Table:
return pyarrow.table({
"partition": pyarrow.array([], type=pyarrow.string()),
"bucket": pyarrow.array([], type=pyarrow.int32()),
"file_path": pyarrow.array([], type=pyarrow.string()),
"file_format": pyarrow.array([], type=pyarrow.string()),
"schema_id": pyarrow.array([], type=pyarrow.int64()),
"level": pyarrow.array([], type=pyarrow.int32()),
"record_count": pyarrow.array([], type=pyarrow.int64()),
"file_size_in_bytes": pyarrow.array([], type=pyarrow.int64()),
"min_key": pyarrow.array([], type=pyarrow.string()),
"max_key": pyarrow.array([], type=pyarrow.string()),
"null_value_counts": pyarrow.array([], type=pyarrow.string()),
"min_value_stats": pyarrow.array([], type=pyarrow.string()),
"max_value_stats": pyarrow.array([], type=pyarrow.string()),
"min_sequence_number": pyarrow.array([], type=pyarrow.int64()),
"max_sequence_number": pyarrow.array([], type=pyarrow.int64()),
"creation_time": pyarrow.array([], type=_TIMESTAMP_TYPE),
"deleteRowCount": pyarrow.array([], type=pyarrow.int64()),
"file_source": pyarrow.array([], type=pyarrow.string()),
"first_row_id": pyarrow.array([], type=pyarrow.int64()),
"write_cols": pyarrow.array([], type=_WRITE_COLS_TYPE),
})