Fix DELETED manifest entry snapshot_id in OverwriteFiles (#3237)
# Rationale for this change
When _OverwriteFiles._deleted_entries() creates DELETED manifest
entries, it now sets snapshot_id to the current (deleting) snapshot's ID
instead of retaining the original INSERT snapshot's ID.
Closes #3236
According to the [Iceberg spec (Manifest Entry
Fields)](https://iceberg.apache.org/spec/#manifest-entry-fields),
`snapshot_id` for a DELETED entry (status=2) should be the snapshot ID
in which the file was deleted. However,
`_OverwriteFiles._deleted_entries()` was copying the original entry's
`snapshot_id` (from the INSERT snapshot) into the new DELETED entry.
This caused downstream consumers that filter manifest entries by
`snapshot_id` (e.g. Iceberg Java's `IncrementalChangelogScan`) to
silently miss DELETED files, breaking CDC pipelines.
## Are these changes tested?
Added `test_manifest_entry_snapshot_id_after_partial_deletes` in
`tests/integration/test_deletes.py`.
## Are there any user-facing changes?
N/A
---------
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 37d1209..1df4e64 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -652,7 +652,7 @@
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,
- snapshot_id=entry.snapshot_id,
+ snapshot_id=self._snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py
index e3b487e..c49689b 100644
--- a/tests/integration/test_deletes.py
+++ b/tests/integration/test_deletes.py
@@ -975,3 +975,52 @@
assert after_delete_snapshot is not None
assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id)
+
+
+@pytest.mark.integration
+def test_manifest_entry_snapshot_id_after_partial_deletes(session_catalog: RestCatalog) -> None:
+ identifier = "default.test_manifest_entry_snapshot_id_after_partial_deletes"
+ try:
+ session_catalog.drop_table(identifier)
+ except NoSuchTableError:
+ pass
+
+ schema = pa.schema(
+ [
+ ("id", pa.int32()),
+ ("name", pa.string()),
+ ]
+ )
+
+ table = session_catalog.create_table(identifier, schema)
+ data = pa.Table.from_pylist(
+ [
+ {"id": 1, "name": "keep"},
+ {"id": 2, "name": "keep"},
+ {"id": 3, "name": "delete_me"},
+ {"id": 4, "name": "delete_me"},
+ ],
+ schema=schema,
+ )
+ table.append(data)
+
+ # Partial delete: only some rows match, triggering CoW overwrite via _OverwriteFiles
+ table.delete(EqualTo("name", "delete_me"))
+ after_delete_snapshot = table.refresh().current_snapshot()
+ assert after_delete_snapshot is not None
+
+ manifests = after_delete_snapshot.manifests(table.io)
+ deleted_entries = [
+ entry
+ for manifest in manifests
+ for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False)
+ if entry.status == ManifestEntryStatus.DELETED
+ ]
+
+ assert len(deleted_entries) > 0, "Expected at least one DELETED manifest entry from the CoW overwrite"
+
+ for entry in deleted_entries:
+ assert entry.snapshot_id == after_delete_snapshot.snapshot_id, (
+ f"DELETED entry snapshot_id should be {after_delete_snapshot.snapshot_id} "
+ f"(the deleting snapshot), but was {entry.snapshot_id}"
+ )