perf: optimize `inspect.partitions` (#2359)

Parallelizes manifest processing to improve performance for large tables
with many manifest files. After parallel processing, merges the
resulting partition maps to produce the final aggregated result.
Previous example ref: e937f6a1811c9e090552a4ae2015a8032e7ea910

<!--
Thanks for opening a pull request!
-->

<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->

# Rationale for this change
Perf improvement.
We experienced slowness with table.inspect.partitions() with large
table.

# Are these changes tested?
Yes.

# Are there any user-facing changes?
No.
<!-- In the case of user-facing changes, please add the changelog label.
-->

---------

Co-authored-by: Hanzhi Wang <hanzhi_wang@apple.com>
Co-authored-by: Fokko Driesprong <fokko@apache.org>
diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index 3bb0268..c3aa870 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -20,7 +20,7 @@
 from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
 
 from pyiceberg.conversions import from_bytes
-from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
+from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
 from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.table.snapshots import Snapshot, ancestors_of
 from pyiceberg.types import PrimitiveType
@@ -288,17 +288,51 @@
 
             table_schema = pa.unify_schemas([partitions_schema, table_schema])
 
-        def update_partitions_map(
-            partitions_map: Dict[Tuple[str, Any], Any],
-            file: DataFile,
-            partition_record_dict: Dict[str, Any],
-            snapshot: Optional[Snapshot],
-        ) -> None:
+        snapshot = self._get_snapshot(snapshot_id)
+        executor = ExecutorFactory.get_or_create()
+        local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))
+
+        partitions_map: Dict[Tuple[str, Any], Any] = {}
+        for local_map in local_partitions_maps:
+            for partition_record_key, partition_row in local_map.items():
+                if partition_record_key not in partitions_map:
+                    partitions_map[partition_record_key] = partition_row
+                else:
+                    existing = partitions_map[partition_record_key]
+                    existing["record_count"] += partition_row["record_count"]
+                    existing["file_count"] += partition_row["file_count"]
+                    existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"]
+                    existing["position_delete_record_count"] += partition_row["position_delete_record_count"]
+                    existing["position_delete_file_count"] += partition_row["position_delete_file_count"]
+                    existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"]
+                    existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"]
+
+                    if partition_row["last_updated_at"] and (
+                        not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"]
+                    ):
+                        existing["last_updated_at"] = partition_row["last_updated_at"]
+                        existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"]
+
+        return pa.Table.from_pylist(
+            partitions_map.values(),
+            schema=table_schema,
+        )
+
+    def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
+        partitions_map: Dict[Tuple[str, Any], Any] = {}
+        for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
+            partition = entry.data_file.partition
+            partition_record_dict = {
+                field.name: partition[pos]
+                for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
+            }
+            entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
+
             partition_record_key = _convert_to_hashable_type(partition_record_dict)
             if partition_record_key not in partitions_map:
                 partitions_map[partition_record_key] = {
                     "partition": partition_record_dict,
-                    "spec_id": file.spec_id,
+                    "spec_id": entry.data_file.spec_id,
                     "record_count": 0,
                     "file_count": 0,
                     "total_data_file_size_in_bytes": 0,
@@ -306,46 +340,34 @@
                     "position_delete_file_count": 0,
                     "equality_delete_record_count": 0,
                     "equality_delete_file_count": 0,
-                    "last_updated_at": snapshot.timestamp_ms if snapshot else None,
-                    "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
+                    "last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None,
+                    "last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None,
                 }
 
             partition_row = partitions_map[partition_record_key]
 
-            if snapshot is not None:
-                if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
-                    partition_row["last_updated_at"] = snapshot.timestamp_ms
-                    partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
+            if entry_snapshot is not None:
+                if (
+                    partition_row["last_updated_at"] is None
+                    or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms
+                ):
+                    partition_row["last_updated_at"] = entry_snapshot.timestamp_ms
+                    partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id
 
-            if file.content == DataFileContent.DATA:
-                partition_row["record_count"] += file.record_count
+            if entry.data_file.content == DataFileContent.DATA:
+                partition_row["record_count"] += entry.data_file.record_count
                 partition_row["file_count"] += 1
-                partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
-            elif file.content == DataFileContent.POSITION_DELETES:
-                partition_row["position_delete_record_count"] += file.record_count
+                partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes
+            elif entry.data_file.content == DataFileContent.POSITION_DELETES:
+                partition_row["position_delete_record_count"] += entry.data_file.record_count
                 partition_row["position_delete_file_count"] += 1
-            elif file.content == DataFileContent.EQUALITY_DELETES:
-                partition_row["equality_delete_record_count"] += file.record_count
+            elif entry.data_file.content == DataFileContent.EQUALITY_DELETES:
+                partition_row["equality_delete_record_count"] += entry.data_file.record_count
                 partition_row["equality_delete_file_count"] += 1
             else:
-                raise ValueError(f"Unknown DataFileContent ({file.content})")
+                raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})")
 
-        partitions_map: Dict[Tuple[str, Any], Any] = {}
-        snapshot = self._get_snapshot(snapshot_id)
-        for manifest in snapshot.manifests(self.tbl.io):
-            for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
-                partition = entry.data_file.partition
-                partition_record_dict = {
-                    field.name: partition[pos]
-                    for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
-                }
-                entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
-                update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
-
-        return pa.Table.from_pylist(
-            partitions_map.values(),
-            schema=table_schema,
-        )
+        return partitions_map
 
     def _get_manifests_schema(self) -> "pa.Schema":
         import pyarrow as pa