[HUDI-1304] Add unit test for testing compaction on replaced file groups (#2150)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 6992a82..094c0b3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -55,6 +55,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
@@ -200,6 +201,31 @@
 
   }
 
+  protected void executeCompactionWithReplacedFiles(String compactionInstantTime, SparkRDDWriteClient client, HoodieTable table,
+                                   HoodieWriteConfig cfg, String[] partitions, Set<HoodieFileGroupId> replacedFileIds) throws IOException {
+
+    client.compact(compactionInstantTime);
+    List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
+    assertTrue(fileSliceList.stream().findAny().isPresent(), "Ensure latest file-slices are not empty");
+    assertFalse(fileSliceList.stream()
+            .anyMatch(fs -> replacedFileIds.contains(fs.getFileGroupId())),
+        "Compacted files should not show up in latest slices");
+
+    // verify that there is a commit
+    table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
+    HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
+    // verify compaction commit is visible in timeline
+    assertTrue(timeline.filterCompletedInstants().getInstants()
+        .filter(instant -> compactionInstantTime.equals(instant.getTimestamp())).findFirst().isPresent());
+    for (String partition: partitions) {
+      table.getSliceView().getLatestFileSlicesBeforeOrOn(partition, compactionInstantTime, true).forEach(fs -> {
+        // verify that all log files are merged
+        assertEquals(0, fs.getLogFiles().count());
+        assertTrue(fs.getBaseFile().isPresent());
+      });
+    }
+  }
+
   protected List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records, SparkRDDWriteClient client,
                                                     HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 8da1f3d..fd6bd83 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,11 +31,14 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -332,4 +336,52 @@
       executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
     }
   }
+
+  @Test
+  public void testCompactionOnReplacedFiles() throws Exception {
+    // Schedule a compaction. Replace those file groups and ensure compaction completes successfully.
+    HoodieWriteConfig cfg = getConfig(true);
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      String firstInstantTime = "001";
+      String secondInstantTime = "004";
+      String compactionInstantTime = "005";
+      String replaceInstantTime = "006";
+      String fourthInstantTime = "007";
+
+      int numRecs = 2000;
+
+      List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
+      runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
+          new ArrayList<>());
+
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
+      scheduleCompaction(compactionInstantTime, client, cfg);
+      metaClient.reloadActiveTimeline();
+      HoodieInstant pendingCompactionInstant =
+          metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
+      assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time");
+
+      Set<HoodieFileGroupId> fileGroupsBeforeReplace = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
+      // replace by using insertOverwrite
+      JavaRDD<HoodieRecord> replaceRecords = jsc.parallelize(dataGen.generateInserts(replaceInstantTime, numRecs), 1);
+      client.startCommitWithTime(replaceInstantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
+      client.insertOverwrite(replaceRecords, replaceInstantTime);
+
+      metaClient.reloadActiveTimeline();
+      hoodieTable = getHoodieTable(metaClient, cfg);
+      Set<HoodieFileGroupId> newFileGroups = getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
+      // make sure earlier file groups are not visible
+      assertEquals(0, newFileGroups.stream().filter(fg -> fileGroupsBeforeReplace.contains(fg)).count());
+
+      // compaction should run with associated file groups are replaced
+      executeCompactionWithReplacedFiles(compactionInstantTime, client, hoodieTable, cfg, dataGen.getPartitionPaths(), fileGroupsBeforeReplace);
+    }
+  }
+
+  private Set<HoodieFileGroupId> getAllFileGroups(HoodieTable table, String[] partitions) {
+    return Arrays.stream(partitions).flatMap(partition -> table.getSliceView().getLatestFileSlices(partition)
+        .map(fg -> fg.getFileGroupId())).collect(Collectors.toSet());
+  }
 }