[HUDI-5718] Unsupported Operation Exception for compaction (#7874)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c6cd554..c0e6263 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,7 +21,9 @@
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
@@ -37,7 +39,6 @@
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
@@ -53,6 +54,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -113,7 +115,7 @@
}
@Test
- public void testCompactionEmpty() throws Exception {
+ public void testCompactionEmpty() {
HoodieWriteConfig config = getConfig();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
@@ -169,41 +171,45 @@
writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
- HoodieTable table = HoodieSparkTable.create(config, context);
newCommitTime = "101";
+ updateRecords(config, newCommitTime, records);
- List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
- JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
- HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
- JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+ assertLogFilesNumEqualsTo(config, 1);
- writeClient.startCommitWithTime(newCommitTime);
- writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
- metaClient.reloadActiveTimeline();
-
- // Verify that all data file has one log file
- table = HoodieSparkTable.create(config, context);
- for (String partitionPath : dataGen.getPartitionPaths()) {
- List<FileSlice> groupedLogFiles =
- table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
- for (FileSlice fileSlice : groupedLogFiles) {
- assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
- }
- }
-
- // Do a compaction
- table = HoodieSparkTable.create(config, context);
String compactionInstantTime = "102";
- table.scheduleCompaction(context, compactionInstantTime, Option.empty());
- table.getMetaClient().reloadActiveTimeline();
- HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact(
- context, compactionInstantTime).getWriteStatuses();
+ HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime);
- // Verify that all partition paths are present in the WriteStatus result
- for (String partitionPath : dataGen.getPartitionPaths()) {
- List<WriteStatus> writeStatuses = result.collectAsList();
- assertTrue(writeStatuses.stream()
- .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0);
+ verifyCompaction(result);
+ }
+ }
+
+ @Test
+ public void testSpillingWhenCompaction() throws Exception {
+ // insert 100 records
+ HoodieWriteConfig config = getConfigBuilder()
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withMemoryConfig(HoodieMemoryConfig.newBuilder()
+ .withMaxMemoryMaxSize(1L, 1L).build()) // force spill
+ .build();
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+ String newCommitTime = "100";
+ writeClient.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+ JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+ writeClient.insert(recordsRDD, newCommitTime).collect();
+
+ // trigger 2 updates following with compaction
+ for (int i = 1; i < 5; i += 2) {
+ // Update all the 100 records
+ newCommitTime = "10" + i;
+ updateRecords(config, newCommitTime, records);
+
+ assertLogFilesNumEqualsTo(config, 1);
+
+ HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));
+
+ verifyCompaction(result);
}
}
}
@@ -212,4 +218,52 @@
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
+
+ private void updateRecords(HoodieWriteConfig config, String newCommitTime, List<HoodieRecord> records) throws IOException {
+ HoodieTable table = HoodieSparkTable.create(config, context);
+ List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
+ JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
+ HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
+ JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+
+ writeClient.startCommitWithTime(newCommitTime);
+ writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
+ metaClient.reloadActiveTimeline();
+ }
+
+ /**
+ * Verify that all data file has {@code expected} number of log files.
+ *
+ * @param config The writer config
+ * @param expected The expected number of log files
+ */
+ private void assertLogFilesNumEqualsTo(HoodieWriteConfig config, int expected) {
+ HoodieTable table = HoodieSparkTable.create(config, context);
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ List<FileSlice> groupedLogFiles =
+ table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ for (FileSlice fileSlice : groupedLogFiles) {
+ assertEquals(expected, fileSlice.getLogFiles().count(), "There should be " + expected + " log file written for every data file");
+ }
+ }
+ }
+
+ /**
+ * Do a compaction.
+ */
+ private HoodieData<WriteStatus> compact(SparkRDDWriteClient writeClient, String compactionInstantTime) {
+ writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+ JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime).getWriteStatuses();
+ return HoodieListData.eager(writeStatusJavaRDD.collect());
+ }
+
+ /**
+ * Verify that all partition paths are present in the WriteStatus result.
+ */
+ private void verifyCompaction(HoodieData<WriteStatus> result) {
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ List<WriteStatus> writeStatuses = result.collectAsList();
+ assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+ }
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e5ce343..1a25695 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,7 +47,6 @@
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -215,7 +214,7 @@
}
public Map<String, HoodieRecord> getRecords() {
- return Collections.unmodifiableMap(records);
+ return records;
}
public HoodieRecordType getRecordType() {