[HUDI-1304] Add unit test for testing compaction on replaced file groups (#2150)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 6992a82..094c0b3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -55,6 +55,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
@@ -200,6 +201,31 @@
}
+ protected void executeCompactionWithReplacedFiles(String compactionInstantTime, SparkRDDWriteClient client, HoodieTable table,
+ HoodieWriteConfig cfg, String[] partitions, Set<HoodieFileGroupId> replacedFileIds) throws IOException {
+
+ client.compact(compactionInstantTime);
+ List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
+ assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
+ assertFalse(fileSliceList.stream()
+ .anyMatch(fs -> replacedFileIds.contains(fs.getFileGroupId())),
+ "Compacted files should not show up in latest slices");
+
+ // verify that there is a commit
+ table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
+ HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
+ // verify compaction commit is visible in timeline
+ assertTrue(timeline.filterCompletedInstants().getInstants()
+ .filter(instant -> compactionInstantTime.equals(instant.getTimestamp())).findFirst().isPresent());
+ for (String partition: partitions) {
+ table.getSliceView().getLatestFileSlicesBeforeOrOn(partition, compactionInstantTime, true).forEach(fs -> {
+ // verify that all log files are merged
+ assertEquals(0, fs.getLogFiles().count());
+ assertTrue(fs.getBaseFile().isPresent());
+ });
+ }
+ }
+
protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, SparkRDDWriteClient client,
HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 8da1f3d..fd6bd83 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,11 +31,14 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -332,4 +336,52 @@
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
}
}
+
+ @Test
+ public void testCompactionOnReplacedFiles() throws Exception {
+ // Schedule a compaction. Replace those file groups and ensure compaction completes successfully.
+ HoodieWriteConfig cfg = getConfig(true);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ String firstInstantTime = "001";
+ String secondInstantTime = "004";
+ String compactionInstantTime = "005";
+ String replaceInstantTime = "006";
+ String fourthInstantTime = "007";
+
+ int numRecs = 2000;
+
+ List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
+ runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
+ new ArrayList<>());
+
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
+ scheduleCompaction(compactionInstantTime, client, cfg);
+ metaClient.reloadActiveTimeline();
+ HoodieInstant pendingCompactionInstant =
+ metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
+ assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time");
+
+ Set<HoodieFileGroupId> fileGroupsBeforeReplace = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
+ // replace by using insertOverwrite
+ JavaRDD<HoodieRecord> replaceRecords = jsc.parallelize(dataGen.generateInserts(replaceInstantTime, numRecs), 1);
+ client.startCommitWithTime(replaceInstantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
+ client.insertOverwrite(replaceRecords, replaceInstantTime);
+
+ metaClient.reloadActiveTimeline();
+ hoodieTable = getHoodieTable(metaClient, cfg);
+ Set<HoodieFileGroupId> newFileGroups = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
+ // make sure earlier file groups are not visible
+ assertEquals(0, newFileGroups.stream().filter(fg -> fileGroupsBeforeReplace.contains(fg)).count());
+
+ // compaction should run with associated file groups are replaced
+ executeCompactionWithReplacedFiles(compactionInstantTime, client, hoodieTable, cfg, dataGen.getPartitionPaths(), fileGroupsBeforeReplace);
+ }
+ }
+
+ private Set<HoodieFileGroupId> getAllFileGroups(HoodieTable table, String[] partitions) {
+ return Arrays.stream(partitions).flatMap(partition -> table.getSliceView().getLatestFileSlices(partition)
+ .map(fg -> fg.getFileGroupId())).collect(Collectors.toSet());
+ }
}