perf: optimize `inspect.partitions` (#2359)
Parallelizes manifest processing to improve performance for large tables
with many manifest files. After parallel processing, merges the
resulting partition maps to produce the final aggregated result.
Previous example ref: e937f6a1811c9e090552a4ae2015a8032e7ea910
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
Perf improvement.
We experienced slowness with table.inspect.partitions() with large
table.
# Are these changes tested?
Yes.
# Are there any user-facing changes?
No.
<!-- In the case of user-facing changes, please add the changelog label.
-->
---------
Co-authored-by: Hanzhi Wang <hanzhi_wang@apple.com>
Co-authored-by: Fokko Driesprong <fokko@apache.org>diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index 3bb0268..c3aa870 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
from pyiceberg.conversions import from_bytes
-from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
+from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
@@ -288,17 +288,51 @@
table_schema = pa.unify_schemas([partitions_schema, table_schema])
- def update_partitions_map(
- partitions_map: Dict[Tuple[str, Any], Any],
- file: DataFile,
- partition_record_dict: Dict[str, Any],
- snapshot: Optional[Snapshot],
- ) -> None:
+ snapshot = self._get_snapshot(snapshot_id)
+ executor = ExecutorFactory.get_or_create()
+ local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))
+
+ partitions_map: Dict[Tuple[str, Any], Any] = {}
+ for local_map in local_partitions_maps:
+ for partition_record_key, partition_row in local_map.items():
+ if partition_record_key not in partitions_map:
+ partitions_map[partition_record_key] = partition_row
+ else:
+ existing = partitions_map[partition_record_key]
+ existing["record_count"] += partition_row["record_count"]
+ existing["file_count"] += partition_row["file_count"]
+ existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"]
+ existing["position_delete_record_count"] += partition_row["position_delete_record_count"]
+ existing["position_delete_file_count"] += partition_row["position_delete_file_count"]
+ existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"]
+ existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"]
+
+ if partition_row["last_updated_at"] and (
+ not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"]
+ ):
+ existing["last_updated_at"] = partition_row["last_updated_at"]
+ existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"]
+
+ return pa.Table.from_pylist(
+ partitions_map.values(),
+ schema=table_schema,
+ )
+
+ def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
+ partitions_map: Dict[Tuple[str, Any], Any] = {}
+ for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
+ partition = entry.data_file.partition
+ partition_record_dict = {
+ field.name: partition[pos]
+ for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
+ }
+ entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
+
partition_record_key = _convert_to_hashable_type(partition_record_dict)
if partition_record_key not in partitions_map:
partitions_map[partition_record_key] = {
"partition": partition_record_dict,
- "spec_id": file.spec_id,
+ "spec_id": entry.data_file.spec_id,
"record_count": 0,
"file_count": 0,
"total_data_file_size_in_bytes": 0,
@@ -306,46 +340,34 @@
"position_delete_file_count": 0,
"equality_delete_record_count": 0,
"equality_delete_file_count": 0,
- "last_updated_at": snapshot.timestamp_ms if snapshot else None,
- "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
+ "last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None,
+ "last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None,
}
partition_row = partitions_map[partition_record_key]
- if snapshot is not None:
- if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
- partition_row["last_updated_at"] = snapshot.timestamp_ms
- partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
+ if entry_snapshot is not None:
+ if (
+ partition_row["last_updated_at"] is None
+ or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms
+ ):
+ partition_row["last_updated_at"] = entry_snapshot.timestamp_ms
+ partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id
- if file.content == DataFileContent.DATA:
- partition_row["record_count"] += file.record_count
+ if entry.data_file.content == DataFileContent.DATA:
+ partition_row["record_count"] += entry.data_file.record_count
partition_row["file_count"] += 1
- partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
- elif file.content == DataFileContent.POSITION_DELETES:
- partition_row["position_delete_record_count"] += file.record_count
+ partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes
+ elif entry.data_file.content == DataFileContent.POSITION_DELETES:
+ partition_row["position_delete_record_count"] += entry.data_file.record_count
partition_row["position_delete_file_count"] += 1
- elif file.content == DataFileContent.EQUALITY_DELETES:
- partition_row["equality_delete_record_count"] += file.record_count
+ elif entry.data_file.content == DataFileContent.EQUALITY_DELETES:
+ partition_row["equality_delete_record_count"] += entry.data_file.record_count
partition_row["equality_delete_file_count"] += 1
else:
- raise ValueError(f"Unknown DataFileContent ({file.content})")
+ raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})")
- partitions_map: Dict[Tuple[str, Any], Any] = {}
- snapshot = self._get_snapshot(snapshot_id)
- for manifest in snapshot.manifests(self.tbl.io):
- for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
- partition = entry.data_file.partition
- partition_record_dict = {
- field.name: partition[pos]
- for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
- }
- entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
- update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
-
- return pa.Table.from_pylist(
- partitions_map.values(),
- schema=table_schema,
- )
+ return partitions_map
def _get_manifests_schema(self) -> "pa.Schema":
import pyarrow as pa