Add metadata tables for `data_files` and `delete_files` (#1066)
* Add metadata tables for data_files and delete_files
* Update API docs for `data_files` and `delete_files`
* Update mehtod signature of `_files()`
* Migrate implementation of files() table from __init__.py
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 53a7846..eaffb84 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -845,6 +845,11 @@
[6.0989]]
```
+!!! info
+ Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes`
+
+To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively.
+
## Add Files
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index 3f64255..470c00f 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -17,7 +17,7 @@
from __future__ import annotations
from datetime import datetime
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
@@ -473,7 +473,7 @@
return pa.Table.from_pylist(history, schema=history_schema)
- def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa
from pyiceberg.io.pyarrow import schema_to_pyarrow
@@ -530,6 +530,8 @@
for manifest_list in snapshot.manifests(io):
for manifest_entry in manifest_list.fetch_manifest_entry(io):
data_file = manifest_entry.data_file
+ if data_file_filter and data_file.content not in data_file_filter:
+ continue
column_sizes = data_file.column_sizes or {}
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
@@ -558,12 +560,12 @@
"spec_id": data_file.spec_id,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
- "column_sizes": dict(data_file.column_sizes),
- "value_counts": dict(data_file.value_counts),
- "null_value_counts": dict(data_file.null_value_counts),
- "nan_value_counts": dict(data_file.nan_value_counts),
- "lower_bounds": dict(data_file.lower_bounds),
- "upper_bounds": dict(data_file.upper_bounds),
+ "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None,
+ "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
+ "null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
+ "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
+ "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
+ "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
@@ -575,3 +577,12 @@
files,
schema=files_schema,
)
+
+ def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ return self._files(snapshot_id)
+
+ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ return self._files(snapshot_id, {DataFileContent.DATA})
+
+ def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py
index 9f63225..68b10f3 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -672,126 +672,141 @@
# append more data
tbl.append(arrow_table_with_null)
- df = tbl.refresh().inspect.files()
+ # configure table properties
+ if format_version == 2:
+ with tbl.transaction() as txn:
+ txn.set_properties({"write.delete.mode": "merge-on-read"})
+ spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
- assert df.column_names == [
- "content",
- "file_path",
- "file_format",
- "spec_id",
- "record_count",
- "file_size_in_bytes",
- "column_sizes",
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- "key_metadata",
- "split_offsets",
- "equality_ids",
- "sort_order_id",
- "readable_metrics",
- ]
+ files_df = tbl.refresh().inspect.files()
- # make sure the non-nullable fields are filled
- for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
- for value in df[int_column]:
- assert isinstance(value.as_py(), int)
+ data_files_df = tbl.inspect.data_files()
- for split_offsets in df["split_offsets"]:
- assert isinstance(split_offsets.as_py(), list)
+ delete_files_df = tbl.inspect.delete_files()
- for file_format in df["file_format"]:
- assert file_format.as_py() == "PARQUET"
-
- for file_path in df["file_path"]:
- assert file_path.as_py().startswith("s3://")
-
- lhs = df.to_pandas()
- rhs = spark.table(f"{identifier}.files").toPandas()
-
- lhs_subset = lhs[
- [
+ def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
+ assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ "key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
+ "readable_metrics",
]
- ]
- rhs_subset = rhs[
- [
- "content",
- "file_path",
- "file_format",
- "spec_id",
- "record_count",
- "file_size_in_bytes",
- "split_offsets",
- "equality_ids",
- "sort_order_id",
+
+ # make sure the non-nullable fields are filled
+ for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
+ for value in df[int_column]:
+ assert isinstance(value.as_py(), int)
+
+ for split_offsets in df["split_offsets"]:
+ assert isinstance(split_offsets.as_py(), list)
+
+ for file_format in df["file_format"]:
+ assert file_format.as_py() == "PARQUET"
+
+ for file_path in df["file_path"]:
+ assert file_path.as_py().startswith("s3://")
+
+ lhs = df.to_pandas()
+ rhs = spark_df.toPandas()
+
+ lhs_subset = lhs[
+ [
+ "content",
+ "file_path",
+ "file_format",
+ "spec_id",
+ "record_count",
+ "file_size_in_bytes",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ ]
]
- ]
+ rhs_subset = rhs[
+ [
+ "content",
+ "file_path",
+ "file_format",
+ "spec_id",
+ "record_count",
+ "file_size_in_bytes",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ ]
+ ]
- assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
+ assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False)
- for column in df.column_names:
- for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
- if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
- # NaN != NaN in Python
- continue
- if column in [
- "column_sizes",
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- ]:
- if isinstance(right, dict):
- left = dict(left)
- assert left == right, f"Difference in column {column}: {left} != {right}"
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
+ # NaN != NaN in Python
+ continue
+ if column in [
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ ]:
+ if isinstance(right, dict):
+ left = dict(left)
+ assert left == right, f"Difference in column {column}: {left} != {right}"
- elif column == "readable_metrics":
- assert list(left.keys()) == [
- "bool",
- "string",
- "string_long",
- "int",
- "long",
- "float",
- "double",
- "timestamp",
- "timestamptz",
- "date",
- "binary",
- "fixed",
- ]
- assert left.keys() == right.keys()
+ elif column == "readable_metrics":
+ assert list(left.keys()) == [
+ "bool",
+ "string",
+ "string_long",
+ "int",
+ "long",
+ "float",
+ "double",
+ "timestamp",
+ "timestamptz",
+ "date",
+ "binary",
+ "fixed",
+ ]
+ assert left.keys() == right.keys()
- for rm_column in left.keys():
- rm_lhs = left[rm_column]
- rm_rhs = right[rm_column]
+ for rm_column in left.keys():
+ rm_lhs = left[rm_column]
+ rm_rhs = right[rm_column]
- assert rm_lhs["column_size"] == rm_rhs["column_size"]
- assert rm_lhs["value_count"] == rm_rhs["value_count"]
- assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
- assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]
+ assert rm_lhs["column_size"] == rm_rhs["column_size"]
+ assert rm_lhs["value_count"] == rm_rhs["value_count"]
+ assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
+ assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]
- if rm_column == "timestamptz":
- # PySpark does not correctly set the timstamptz
- rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
- rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
+ if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]:
+ # PySpark does not correctly set the timstamptz
+ rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
+ rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
- assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
- assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
- else:
- assert left == right, f"Difference in column {column}: {left} != {right}"
+ assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
+ assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
+ else:
+ assert left == right, f"Difference in column {column}: {left} != {right}"
+
+ inspect_files_asserts(files_df, spark.table(f"{identifier}.files"))
+ inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files"))
+ inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files"))
@pytest.mark.integration
@@ -801,26 +816,33 @@
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
- df = tbl.refresh().inspect.files()
+ files_df = tbl.refresh().inspect.files()
+ data_files_df = tbl.inspect.data_files()
+ delete_files_df = tbl.inspect.delete_files()
- assert df.column_names == [
- "content",
- "file_path",
- "file_format",
- "spec_id",
- "record_count",
- "file_size_in_bytes",
- "column_sizes",
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- "key_metadata",
- "split_offsets",
- "equality_ids",
- "sort_order_id",
- "readable_metrics",
- ]
+ def inspect_files_asserts(df: pa.Table) -> None:
+ assert df.column_names == [
+ "content",
+ "file_path",
+ "file_format",
+ "spec_id",
+ "record_count",
+ "file_size_in_bytes",
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ "key_metadata",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ "readable_metrics",
+ ]
- assert df.to_pandas().empty is True
+ assert df.to_pandas().empty is True
+
+ inspect_files_asserts(files_df)
+ inspect_files_asserts(data_files_df)
+ inspect_files_asserts(delete_files_df)