feat: (HUDI-9714) Disallow changing merge related properties for Spark writes (#13959)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index c70aef2..c9807da 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -26,6 +26,7 @@
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE, SPARK_SQL_MERGE_INTO_PREPPED_KEY}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
@@ -217,28 +218,28 @@
}
/**
- * This function adds specific rules to choose the right config key for payload class for version 9 tables.
- *
- * RULE 1: When
- * 1. table version is 9,
- * 2. writer key is a payload class key, and
- * 3. table config has legacy payload class configured,
- * then
- * return legacy payload class key.
- *
- * Basic rule:
- * return writer key.
+ * For table version >= 9, this function finds the corresponding key in `HoodieTableConfig`
+ * for a key in `HoodieWriteConfig`, including configs related to payload class, record merge mode, merge strategy id.
*/
- def getPayloadClassConfigKeyFromTableConfig(key: String, tableConfig: HoodieConfig): String = {
- if (tableConfig == null) {
+ def getKeyInTableConfig(key: String, tableConfig: HoodieConfig): String = {
+ if (tableConfig == null || tableConfig.getInt(HoodieTableConfig.VERSION) < HoodieTableVersion.NINE.versionCode()) {
key
} else {
- if (tableConfig.getInt(HoodieTableConfig.VERSION) == HoodieTableVersion.NINE.versionCode()
- && !StringUtils.isNullOrEmpty(tableConfig.getStringOrDefault(
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, StringUtils.EMPTY_STRING).trim)) {
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key
- } else {
- key
+ key match {
+ case k if k == HoodieTableConfig.PAYLOAD_CLASS_NAME.key || k == PAYLOAD_CLASS_NAME.key =>
+ val legacyPayload = tableConfig.getStringOrDefault(
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, StringUtils.EMPTY_STRING
+ ).trim
+ if (!StringUtils.isNullOrEmpty(legacyPayload)) {
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key
+ } else {
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key
+ }
+ case k if k == HoodieWriteConfig.RECORD_MERGE_MODE.key =>
+ HoodieTableConfig.RECORD_MERGE_MODE.key
+ case k if k == HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key =>
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key
+ case _ => key
}
}
}
@@ -254,11 +255,7 @@
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
if (!shouldIgnoreConfig(key, value, params, tableConfig)) {
- val keyInTableConfig = if (key.equals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key)) {
- getPayloadClassConfigKeyFromTableConfig(key, tableConfig)
- } else {
- key
- }
+ val keyInTableConfig = getKeyInTableConfig(key, tableConfig)
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, keyInTableConfig)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(keyInTableConfig)}\n")
diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
index a8ec70d..ee9ebef 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
@@ -23,21 +23,18 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.util.JavaScalaConverters;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Properties;
-import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.junit.jupiter.api.Assertions.assertEquals;
class TestHoodieWriterUtils extends HoodieClientTestBase {
@@ -52,37 +49,71 @@
}
@Test
- void testGetKeyInTableConfigTableVersion9PayloadClassKeyWithoutLegacyPayloadClass() {
- HoodieConfig config = new HoodieConfig();
- String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
- HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
- Assertions.assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), result);
+ void testReturnsKeyWhenTableConfigIsNull() {
+ assertEquals("randomKey", HoodieWriterUtils.getKeyInTableConfig("randomKey", null));
}
- private static Stream<Arguments> testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass() {
-
- Stream<Arguments> arguments = Stream.of(
- arguments(HoodieTableVersion.EIGHT,
- "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.compaction.payload.class"),
- arguments(HoodieTableVersion.NINE,
- "", "hoodie.compaction.payload.class"),
- arguments(HoodieTableVersion.NINE,
- " ", "hoodie.compaction.payload.class"),
- arguments(HoodieTableVersion.NINE,
- "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.table.legacy.payload.class")
- );
- return arguments;
+ @Test
+ void testPayloadClassNameNotVersion9() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION, "8");
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+ assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), result);
}
- @ParameterizedTest
- @MethodSource()
- void testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass(
- HoodieTableVersion tableVersion, String legacyPayloadToSet, String expectedKey) {
+ @Test
+ void testPayloadClassNameVersion9WithLegacyPayload() {
HoodieConfig config = new HoodieConfig();
- config.setValue(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, legacyPayloadToSet);
- config.setValue(HoodieTableConfig.VERSION, String.valueOf(tableVersion.versionCode()));
- String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
- HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
- Assertions.assertEquals(expectedKey, result);
+ config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
+ config.setValue(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, "com.example.LegacyPayload");
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+ assertEquals(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key(), result);
+ }
+
+ @Test
+ void testPayloadClassNameVersion9WithoutLegacyPayload() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+ assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), result);
+ }
+
+ @Test
+ void testRecordMergeModeMappingWithVersion9() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION.key(), "9");
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_MODE.key(), config);
+ assertEquals(HoodieTableConfig.RECORD_MERGE_MODE.key(), result);
+ }
+
+ @Test
+ void testRecordMergeModeMappingWithVersion8() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION.key(), "8");
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_MODE.key(), config);
+ assertEquals(HoodieWriteConfig.RECORD_MERGE_MODE.key(), result);
+ }
+
+ @Test
+ void testRecordMergeStrategyIdMappingWithVersion9() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION.key(), "9");
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), config);
+ assertEquals(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), result);
+ }
+
+ @Test
+ void testRecordMergeStrategyIdMappingWithVersion8() {
+ HoodieConfig config = new HoodieConfig();
+ config.setValue(HoodieTableConfig.VERSION.key(), "8");
+ String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), config);
+ assertEquals(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), result);
+ }
+
+ @Test
+ void testFallbackToOriginalKey() {
+ HoodieConfig config = new HoodieConfig();
+ String result = HoodieWriterUtils.getKeyInTableConfig("my.custom.key", config);
+ assertEquals("my.custom.key", result);
}
}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 326b6ce..38e107d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -152,7 +152,6 @@
// being queried by the Spark, and we currently have no way figuring out what these fields are, therefore
// we fallback to read whole row
val overriddenOpts = defaultWriteOpts ++ Map(
- HoodieWriteConfig.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(),
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index ec5dd8d..2f59db9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -122,8 +122,6 @@
.option(HoodieTableConfig.ORDERING_FIELDS.key, "ts")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
- .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, "org.apache.hudi.common.model.PartialUpdateAvroPayload")
- .option(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.mode(SaveMode.Append)
.save(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index b63bf61..6e36702 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -811,7 +811,6 @@
| hoodie.enable.data.skipping = 'true',
| hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.AWSDmsAvroPayload',
| hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
- | hoodie.write.record.merge.mode = 'CUSTOM',
| hoodie.table.cdc.enabled = 'true',
| hoodie.table.cdc.supplemental.logging.mode = 'data_before_after'
| )
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 1044f5d..537469b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -21,12 +21,14 @@
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.read.CustomPayloadForTesting
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieException
import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.HoodieIndex.IndexType
@@ -38,7 +40,8 @@
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
@@ -311,7 +314,8 @@
"MERGE_ON_READ,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE"))
def testDeletesWithHoodieIsDeleted(tableType: HoodieTableType, tableVersion: Int, mergeMode: RecordMergeMode, indexType: IndexType): Unit = {
var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
- writeOpts = writeOpts ++ Map("hoodie.write.table.version" -> tableVersion.toString,
+ writeOpts = writeOpts ++ Map(
+ "hoodie.write.table.version" -> tableVersion.toString,
"hoodie.datasource.write.table.type" -> tableType.name(),
HoodieTableConfig.ORDERING_FIELDS.key() -> "ts",
"hoodie.write.record.merge.mode" -> mergeMode.name(),
@@ -373,6 +377,104 @@
.options(readOpts).load(basePath).select("_hoodie_record_key", "_hoodie_partition_path").count())
}
+ @ParameterizedTest
+ @CsvSource(value = Array(
+ "8,EVENT_TIME_ORDERING",
+ "8,COMMIT_TIME_ORDERING",
+ "8,CUSTOM",
+ "9,EVENT_TIME_ORDERING",
+ "9,COMMIT_TIME_ORDERING",
+ "9,CUSTOM"))
+ def testImmutabilityOfMergeConfigs(tableVersion: Int,
+ mergeMode: RecordMergeMode): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+ val payloadClass = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+ classOf[DefaultHoodieRecordPayload].getName
+ } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
+ classOf[OverwriteWithLatestAvroPayload].getName
+ } else {
+ classOf[CustomPayloadForTesting].getName
+ }
+ val strategyId = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+ HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+ } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
+ HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+ } else {
+ HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
+ }
+ writeOpts = writeOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key -> tableVersion.toString,
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false",
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> payloadClass,
+ HoodieTableConfig.ORDERING_FIELDS.key -> "ts",
+ HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode.name())
+
+ // generate the inserts
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val inserts = DataSourceTestUtils.generateRandomRows(400)
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(
+ convertRowListToSeq(inserts)), structType)
+
+ // Do insert.
+ df.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ // Validate data quality.
+ val readDf = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(400, readDf.count())
+ // Validate table configs.
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+ assertEquals(tableVersion, metaClient.getTableConfig.getTableVersion.versionCode())
+ assertEquals(mergeMode, metaClient.getTableConfig.getRecordMergeMode)
+ assertEquals(strategyId, metaClient.getTableConfig.getRecordMergeStrategyId)
+
+ val df1 = df.limit(1)
+ val diffMergeMode = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+ RecordMergeMode.COMMIT_TIME_ORDERING
+ } else {
+ RecordMergeMode.EVENT_TIME_ORDERING
+ }
+ if (tableVersion < 9) {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, diffMergeMode.name)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val finalDf = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertEquals(399, finalDf.count())
+ } else {
+ Assertions.assertThrows(classOf[HoodieException], () => {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, "any_other_payload")
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ })
+ }
+ Assertions.assertThrows(classOf[HoodieException], () => {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, "any_other_payload")
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ })
+ Assertions.assertThrows(classOf[HoodieException], () => {
+ df1.write.format("hudi")
+ .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, "any_other_strategy_id")
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ })
+ }
+
def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer, structType: StructType, inserts: java.util.List[Row],
writeOpts: Map[String, String]): Unit = {
val toUpdate = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, recordsToUpdate), structType).collectAsList()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index 52fec90..cd442af 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -653,7 +653,6 @@
"hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
"hoodie.datasource.write.partitionpath.field" -> "",
"hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.AWSDmsAvroPayload",
- DataSourceWriteOptions.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(),
"hoodie.table.cdc.enabled" -> "true",
"hoodie.table.cdc.supplemental.logging.mode" -> "data_before_after"
)
@@ -787,7 +786,6 @@
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload")
- .option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.option("hoodie.table.cdc.enabled", "true")
.option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name())
.mode(SaveMode.Append).save(basePath)
@@ -808,7 +806,6 @@
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload")
- .option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.option("hoodie.table.cdc.enabled", "true")
.option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name())
.mode(SaveMode.Append).save(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
index c277af2..2c345fd 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
@@ -681,9 +681,7 @@
test("Test MergeInto Partial Updates should fail with CUSTOM payload and merge mode") {
withTempDir { tmp =>
withSQLConf(
- "hoodie.index.type" -> "GLOBAL_SIMPLE",
- "hoodie.write.record.merge.mode" -> "CUSTOM",
- "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload") {
+ "hoodie.index.type" -> "GLOBAL_SIMPLE") {
val tableName = generateTableName
spark.sql(
s"""
@@ -700,6 +698,8 @@
| TBLPROPERTIES (
| type = 'mor',
| primaryKey = 'record_key',
+ | hoodie.write.record.merge.mode = 'CUSTOM',
+ | hoodie.datasource.write.payload.class = 'org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload',
| preCombineField = 'ts')""".stripMargin)
spark.sql(
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 652776b..d2e3753 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -39,6 +39,7 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -61,6 +62,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
@@ -157,6 +159,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
import static org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName;
import static org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException;
+import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -429,12 +432,27 @@
StorageConfiguration<?> storageConf) throws IOException {
this.commitsTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
+
+ RecordMergeMode mergeMode = cfg.recordMergeMode;
+ String mergeStrategyId = cfg.recordMergeStrategyId;
+ String payloadClass = cfg.payloadClassName;
+
+ // When at least one configs from cfg.configs are not null,
+ // we should try to use them since it overrides the default ones.
+ Option<Triple<RecordMergeMode, String, String>> overridingMergeConfigs =
+ parseOverridingMergeConfigs(cfg.configs);
+ if (overridingMergeConfigs.isPresent()) {
+ mergeMode = overridingMergeConfigs.get().getLeft();
+ payloadClass = overridingMergeConfigs.get().getMiddle();
+ mergeStrategyId = overridingMergeConfigs.get().getRight();
+ }
+
return tableBuilder.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue())
- .setPayloadClassName(cfg.payloadClassName)
- .setRecordMergeStrategyId(cfg.recordMergeStrategyId)
- .setRecordMergeMode(cfg.recordMergeMode)
+ .setPayloadClassName(payloadClass)
+ .setRecordMergeStrategyId(mergeStrategyId)
+ .setRecordMergeMode(mergeMode)
.setBaseFileFormat(cfg.baseFileFormat)
.setPartitionFields(partitionColumns)
.setTableVersion(ConfigUtils.getIntWithAltKeys(props, WRITE_TABLE_VERSION))
@@ -460,6 +478,23 @@
.initTable(storageConf, cfg.targetBasePath);
}
+ static Option<Triple<RecordMergeMode, String, String>> parseOverridingMergeConfigs(List<String> configs) {
+ if (configs == null || configs.isEmpty()) {
+ return Option.empty();
+ }
+ TypedProperties overwrittenProps = buildProperties(configs);
+ if (overwrittenProps.containsKey(HoodieWriteConfig.RECORD_MERGE_MODE.key())
+ || overwrittenProps.containsKey(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key())
+ || overwrittenProps.containsKey(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key())) {
+ RecordMergeMode mergeMode = overwrittenProps.containsKey(HoodieWriteConfig.RECORD_MERGE_MODE.key())
+ ? RecordMergeMode.valueOf(overwrittenProps.getProperty(HoodieWriteConfig.RECORD_MERGE_MODE.key())) : null;
+ String mergeStrategyId = overwrittenProps.getString(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), null);
+ String payloadClass = overwrittenProps.getProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), null);
+ return Option.of(Triple.of(mergeMode, payloadClass, mergeStrategyId));
+ }
+ return Option.empty();
+ }
+
/**
* Run one round of delta sync and return new compaction instant if one got scheduled.
*/
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 4106260..c9614a7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -211,7 +211,7 @@
Map<String, String> opts = new HashMap<>();
opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), DefaultSparkRecordMerger.class.getName());
opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
- opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name());
+ opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.EVENT_TIME_ORDERING.name());
opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
for (Map.Entry<String, String> entry : opts.entrySet()) {
hoodieConfig.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index 832c655..850b15e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -21,6 +21,7 @@
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,6 +29,7 @@
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
@@ -46,12 +48,14 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -66,6 +70,7 @@
import static org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -389,4 +394,61 @@
expected.put(CHECKPOINT_RESET_KEY, "test-checkpoint");
assertEquals(expected, result, "Should return default metadata when checkpoint is null");
}
+
+ @Nested
+ class TestsWithoutSetup {
+ @Test
+ void testParseOverridingMergeConfigsWithEmptyConfigs() {
+ List<String> configs = Collections.emptyList();
+ Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testParseOverridingMergeConfigsWithNullConfigs() {
+ Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(null);
+ assertTrue(result.isEmpty(), "Should return empty option when configs list is null");
+ }
+
+ @Test
+ void testParseOverridingMergeConfigsWithNonMergeConfigs() {
+ List<String> configs = Arrays.asList(
+ "hoodie.datasource.write.table.name=testTable",
+ "hoodie.datasource.write.operation=upsert"
+ );
+ Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testParseOverridingMergeConfigsWithSomeMergeConfigs() {
+ List<String> configs = Arrays.asList(
+ "hoodie.datasource.write.table.name=testTable",
+ "hoodie.datasource.write.operation=upsert",
+ "hoodie.write.record.merge.mode=COMMIT_TIME_ORDERING"
+ );
+ Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+ assertTrue(result.isPresent());
+ assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, result.get().getLeft());
+ assertNull(result.get().getMiddle());
+ assertNull(result.get().getRight());
+ }
+
+ @Test
+ void testParseOverridingMergeConfigsWithMixedConfigs() {
+ List<String> configs = Arrays.asList(
+ "hoodie.datasource.write.table.name=testTable",
+ "hoodie.write.record.merge.mode=COMMIT_TIME_ORDERING",
+ "hoodie.datasource.write.operation=upsert",
+ "hoodie.write.record.merge.strategy.id=any_id",
+ "hoodie.datasource.write.payload.class=org.apache.hudi.common.model.OverwriteWithLatestPayload"
+ );
+ Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+ assertTrue(result.isPresent());
+ Triple<RecordMergeMode, String, String> triple = result.get();
+ assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, triple.getLeft());
+ assertEquals("org.apache.hudi.common.model.OverwriteWithLatestPayload", triple.getMiddle());
+ assertEquals("any_id", triple.getRight());
+ }
+ }
}