Core: Fix missing files from transaction retries with conflicting manifest merges (#9230)
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 734cab2..e0919d9 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -171,16 +171,16 @@
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
if (newManifests != null) {
- List<ManifestFile> committedNewManifests = Lists.newArrayList();
+ boolean hasDeletes = false;
for (ManifestFile manifest : newManifests) {
- if (committed.contains(manifest)) {
- committedNewManifests.add(manifest);
- } else {
+ if (!committed.contains(manifest)) {
deleteFile(manifest.path());
+ hasDeletes = true;
}
}
-
- this.newManifests = committedNewManifests;
+ if (hasDeletes) {
+ this.newManifests = null;
+ }
}
// clean up only rewrittenAppendManifests as they are always owned by the table
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e06a491..864e916 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -879,16 +879,17 @@
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (cachedNewDataManifests != null) {
- List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
+ boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
- if (committed.contains(manifest)) {
- committedNewDataManifests.add(manifest);
- } else {
+ if (!committed.contains(manifest)) {
deleteFile(manifest.path());
+ hasDeletes = true;
}
}
- this.cachedNewDataManifests = committedNewDataManifests;
+ if (hasDeletes) {
+ this.cachedNewDataManifests = null;
+ }
}
ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator();
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 3b30758..74892db 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -765,4 +765,51 @@
Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists());
Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
}
+
+ @Test
+ public void testTransactionRecommit() {
+ // update table settings to merge when there are 3 manifests
+ table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit();
+
+ // create manifests so that the next commit will trigger a merge
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.newFastAppend().appendFile(FILE_B).commit();
+
+ // start a transaction with appended files that will merge
+ Transaction transaction = Transactions.newTransaction(table.name(), table.ops());
+
+ AppendFiles append = transaction.newAppend().appendFile(FILE_D);
+ Snapshot pending = append.apply();
+
+ Assert.assertEquals(
+ "Should produce 1 pending merged manifest", 1, pending.allManifests(table.io()).size());
+
+ // because a merge happened, the appended manifest is deleted the by append operation
+ append.commit();
+
+ // concurrently commit FILE_A without a transaction to cause the previous append to retry
+ table.newAppend().appendFile(FILE_C).commit();
+ Assert.assertEquals(
+ "Should produce 1 committed merged manifest",
+ 1,
+ table.currentSnapshot().allManifests(table.io()).size());
+
+ transaction.commitTransaction();
+
+ Set<String> paths =
+ Sets.newHashSet(
+ Iterables.transform(
+ table.newScan().planFiles(), task -> task.file().path().toString()));
+ Set<String> expectedPaths =
+ Sets.newHashSet(
+ FILE_A.path().toString(),
+ FILE_B.path().toString(),
+ FILE_C.path().toString(),
+ FILE_D.path().toString());
+
+ Assert.assertEquals("Should contain all committed files", expectedPaths, paths);
+
+ Assert.assertEquals(
+ "Should produce 2 manifests", 2, table.currentSnapshot().allManifests(table.io()).size());
+ }
}