Fix tracing existing entries when there are deletes (#1046)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 98bb885..f48e6f2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -62,7 +62,7 @@
Reference,
)
from pyiceberg.expressions.visitors import (
- ROWS_CANNOT_MATCH,
+ ROWS_MIGHT_NOT_MATCH,
ROWS_MUST_MATCH,
_InclusiveMetricsEvaluator,
_StrictMetricsEvaluator,
@@ -3360,13 +3360,14 @@
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
+ # Based on the metadata, it can be dropped right away
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
- elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
- existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
else:
- # Based on the metadata, it is unsure to say if the file can be deleted
- partial_rewrites_needed = True
+ # Based on the metadata, we cannot determine if it can be deleted
+ existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
+ if inclusive_metrics_evaluator(entry.data_file) != ROWS_MIGHT_NOT_MATCH:
+ partial_rewrites_needed = True
if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries
@@ -3383,8 +3384,6 @@
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
- # else:
- # deleted_manifests.append()
else:
existing_manifests.append(manifest_file)
else:
diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py
index 0716862..53bb25e 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -23,6 +23,7 @@
from typing import Any, Dict
from urllib.parse import urlparse
+import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
@@ -38,13 +39,20 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
-from pyiceberg.expressions import In
+from pyiceberg.expressions import GreaterThanOrEqual, In, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
-from pyiceberg.transforms import IdentityTransform
-from pyiceberg.types import IntegerType, LongType, NestedField, StringType
+from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.types import (
+ DateType,
+ DoubleType,
+ IntegerType,
+ LongType,
+ NestedField,
+ StringType,
+)
from utils import _create_table
@@ -1331,3 +1339,71 @@
tbl.overwrite(data, In("id", ["1", "2", "3"]))
assert len(tbl.scan().to_arrow()) == 3
+
+
+@pytest.mark.integration
+def test_delete_threshold() -> None:
+ catalog = load_catalog(
+ "local",
+ **{
+ "type": "rest",
+ "uri": "http://localhost:8181",
+ "s3.endpoint": "http://localhost:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ )
+
+ schema = Schema(
+ NestedField(field_id=101, name="id", field_type=LongType(), required=True),
+ NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
+ NestedField(field_id=104, name="relevancy_score", field_type=DoubleType(), required=False),
+ )
+
+ partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))
+
+ try:
+ catalog.drop_table(
+ identifier="default.scores",
+ )
+ except NoSuchTableError:
+ pass
+
+ catalog.create_table(
+ identifier="default.scores",
+ schema=schema,
+ partition_spec=partition_spec,
+ )
+
+ # Parameters
+ num_rows = 100 # Number of rows in the dataframe
+ id_min, id_max = 1, 10000
+ date_start, date_end = date(2024, 1, 1), date(2024, 2, 1)
+
+ # Generate the 'id' column
+ id_column = np.random.randint(id_min, id_max, num_rows)
+
+ # Generate the 'created_at' column as dates only
+ date_range = pd.date_range(start=date_start, end=date_end, freq="D") # Daily frequency for dates
+ created_at_column = np.random.choice(date_range, num_rows) # Convert to string (YYYY-MM-DD format)
+
+ # Generate the 'relevancy_score' column with a peak around 0.1
+ relevancy_score_column = np.random.beta(a=2, b=20, size=num_rows) # Adjusting parameters to peak around 0.1
+
+ # Create the dataframe
+ df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})
+
+ iceberg_table = catalog.load_table("default.scores")
+
+ # Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
+ arrow_schema = iceberg_table.schema().as_arrow()
+ docs_table = pa.Table.from_pandas(df, schema=arrow_schema)
+
+ # Append the data to the Iceberg table
+ iceberg_table.append(docs_table)
+
+ delete_condition = GreaterThanOrEqual("relevancy_score", 0.1)
+ lower_before = len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow())
+ assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
+ iceberg_table.delete(delete_condition)
+ assert len(iceberg_table.scan().to_arrow()) == lower_before