[HUDI-5718] Unsupported Operation Exception for compaction (#7874)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c6cd554..c0e6263 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,7 +21,9 @@
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,7 +39,6 @@
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
@@ -53,6 +54,7 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -113,7 +115,7 @@
   }
 
   @Test
-  public void testCompactionEmpty() throws Exception {
+  public void testCompactionEmpty() {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
@@ -169,41 +171,45 @@
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      HoodieTable table = HoodieSparkTable.create(config, context);
       newCommitTime = "101";
+      updateRecords(config, newCommitTime, records);
 
-      List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
-      JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
-      HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
-      JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+      assertLogFilesNumEqualsTo(config, 1);
 
-      writeClient.startCommitWithTime(newCommitTime);
-      writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
-      metaClient.reloadActiveTimeline();
-
-      // Verify that all data file has one log file
-      table = HoodieSparkTable.create(config, context);
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<FileSlice> groupedLogFiles =
-            table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-        for (FileSlice fileSlice : groupedLogFiles) {
-          assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
-        }
-      }
-
-      // Do a compaction
-      table = HoodieSparkTable.create(config, context);
       String compactionInstantTime = "102";
-      table.scheduleCompaction(context, compactionInstantTime, Option.empty());
-      table.getMetaClient().reloadActiveTimeline();
-      HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact(
-          context, compactionInstantTime).getWriteStatuses();
+      HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime);
 
-      // Verify that all partition paths are present in the WriteStatus result
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<WriteStatus> writeStatuses = result.collectAsList();
-        assertTrue(writeStatuses.stream()
-            .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0);
+      verifyCompaction(result);
+    }
+  }
+
+  @Test
+  public void testSpillingWhenCompaction() throws Exception {
+    // insert 100 records
+    HoodieWriteConfig config = getConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withMemoryConfig(HoodieMemoryConfig.newBuilder()
+            .withMaxMemoryMaxSize(1L, 1L).build()) // force spill
+        .build();
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      String newCommitTime = "100";
+      writeClient.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      writeClient.insert(recordsRDD, newCommitTime).collect();
+
+      // trigger 2 updates following with compaction
+      for (int i = 1; i < 5; i += 2) {
+        // Update all the 100 records
+        newCommitTime = "10" + i;
+        updateRecords(config, newCommitTime, records);
+
+        assertLogFilesNumEqualsTo(config, 1);
+
+        HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));
+
+        verifyCompaction(result);
       }
     }
   }
@@ -212,4 +218,52 @@
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
+
+  private void updateRecords(HoodieWriteConfig config, String newCommitTime, List<HoodieRecord> records) throws IOException {
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
+    JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
+    HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
+    JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+
+    writeClient.startCommitWithTime(newCommitTime);
+    writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
+    metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Verify that all data file has {@code expected} number of log files.
+   *
+   * @param config   The writer config
+   * @param expected The expected number of log files
+   */
+  private void assertLogFilesNumEqualsTo(HoodieWriteConfig config, int expected) {
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    for (String partitionPath : dataGen.getPartitionPaths()) {
+      List<FileSlice> groupedLogFiles =
+          table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+      for (FileSlice fileSlice : groupedLogFiles) {
+        assertEquals(expected, fileSlice.getLogFiles().count(), "There should be " + expected + " log file written for every data file");
+      }
+    }
+  }
+
+  /**
+   * Do a compaction.
+   */
+  private HoodieData<WriteStatus> compact(SparkRDDWriteClient writeClient, String compactionInstantTime) {
+    writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime).getWriteStatuses();
+    return HoodieListData.eager(writeStatusJavaRDD.collect());
+  }
+
+  /**
+   * Verify that all partition paths are present in the WriteStatus result.
+   */
+  private void verifyCompaction(HoodieData<WriteStatus> result) {
+    for (String partitionPath : dataGen.getPartitionPaths()) {
+      List<WriteStatus> writeStatuses = result.collectAsList();
+      assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e5ce343..1a25695 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,7 +47,6 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -215,7 +214,7 @@
   }
 
   public Map<String, HoodieRecord> getRecords() {
-    return Collections.unmodifiableMap(records);
+    return records;
   }
 
   public HoodieRecordType getRecordType() {