feat: (HUDI-9714) Disallow changing merge related properties for Spark writes (#13959)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index c70aef2..c9807da 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -26,6 +26,7 @@
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.{RECORD_MERGE_MODE, SPARK_SQL_MERGE_INTO_PREPPED_KEY}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
@@ -217,28 +218,28 @@
   }
 
   /**
-   * This function adds specific rules to choose the right config key for payload class for version 9 tables.
-   *
-   * RULE 1: When
-   *   1. table version is 9,
-   *   2. writer key is a payload class key, and
-   *   3. table config has legacy payload class configured,
-   * then
-   *   return legacy payload class key.
-   *
-   * Basic rule:
-   *   return writer key.
+   * For table version >= 9, this function finds the corresponding key in `HoodieTableConfig`
+   * for a key in `HoodieWriteConfig`, including configs related to payload class, record merge mode, merge strategy id.
    */
-  def getPayloadClassConfigKeyFromTableConfig(key: String, tableConfig: HoodieConfig): String = {
-    if (tableConfig == null) {
+  def getKeyInTableConfig(key: String, tableConfig: HoodieConfig): String = {
+    if (tableConfig == null || tableConfig.getInt(HoodieTableConfig.VERSION) < HoodieTableVersion.NINE.versionCode()) {
       key
     } else {
-      if (tableConfig.getInt(HoodieTableConfig.VERSION) == HoodieTableVersion.NINE.versionCode()
-        && !StringUtils.isNullOrEmpty(tableConfig.getStringOrDefault(
-        HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, StringUtils.EMPTY_STRING).trim)) {
-        HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key
-      } else {
-        key
+      key match {
+        case k if k == HoodieTableConfig.PAYLOAD_CLASS_NAME.key || k == PAYLOAD_CLASS_NAME.key =>
+          val legacyPayload = tableConfig.getStringOrDefault(
+            HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, StringUtils.EMPTY_STRING
+          ).trim
+          if (!StringUtils.isNullOrEmpty(legacyPayload)) {
+            HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key
+          } else {
+            HoodieTableConfig.PAYLOAD_CLASS_NAME.key
+          }
+        case k if k == HoodieWriteConfig.RECORD_MERGE_MODE.key =>
+          HoodieTableConfig.RECORD_MERGE_MODE.key
+        case k if k == HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key =>
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key
+        case _ => key
       }
     }
   }
@@ -254,11 +255,7 @@
       val diffConfigs = StringBuilder.newBuilder
       params.foreach { case (key, value) =>
         if (!shouldIgnoreConfig(key, value, params, tableConfig)) {
-          val keyInTableConfig = if (key.equals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key))  {
-            getPayloadClassConfigKeyFromTableConfig(key, tableConfig)
-          } else {
-            key
-          }
+          val keyInTableConfig = getKeyInTableConfig(key, tableConfig)
           val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, keyInTableConfig)
           if (null != existingValue && !resolver(existingValue, value)) {
             diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(keyInTableConfig)}\n")
diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
index a8ec70d..ee9ebef 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java
@@ -23,21 +23,18 @@
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.util.JavaScalaConverters;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.Properties;
-import java.util.stream.Stream;
 
 import static org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 class TestHoodieWriterUtils extends HoodieClientTestBase {
 
@@ -52,37 +49,71 @@
   }
 
   @Test
-  void testGetKeyInTableConfigTableVersion9PayloadClassKeyWithoutLegacyPayloadClass() {
-    HoodieConfig config = new HoodieConfig();
-    String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
-        HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
-    Assertions.assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), result);
+  void testReturnsKeyWhenTableConfigIsNull() {
+    assertEquals("randomKey", HoodieWriterUtils.getKeyInTableConfig("randomKey", null));
   }
 
-  private static Stream<Arguments> testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass() {
-
-    Stream<Arguments> arguments = Stream.of(
-        arguments(HoodieTableVersion.EIGHT,
-            "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.compaction.payload.class"),
-        arguments(HoodieTableVersion.NINE,
-            "", "hoodie.compaction.payload.class"),
-        arguments(HoodieTableVersion.NINE,
-            "  ", "hoodie.compaction.payload.class"),
-        arguments(HoodieTableVersion.NINE,
-            "org.apache.hudi.common.model.DefaultHoodieRecordPayload", "hoodie.table.legacy.payload.class")
-    );
-    return arguments;
+  @Test
+  void testPayloadClassNameNotVersion9() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION, "8");
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+    assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), result);
   }
 
-  @ParameterizedTest
-  @MethodSource()
-  void testGetKeyInTableConfigTableVersion9PayloadClassKeyWithLegacyPayloadClass(
-      HoodieTableVersion tableVersion, String legacyPayloadToSet, String expectedKey) {
+  @Test
+  void testPayloadClassNameVersion9WithLegacyPayload() {
     HoodieConfig config = new HoodieConfig();
-    config.setValue(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, legacyPayloadToSet);
-    config.setValue(HoodieTableConfig.VERSION, String.valueOf(tableVersion.versionCode()));
-    String result = HoodieWriterUtils.getPayloadClassConfigKeyFromTableConfig(
-        HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
-    Assertions.assertEquals(expectedKey, result);
+    config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
+    config.setValue(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME, "com.example.LegacyPayload");
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+    assertEquals(HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key(), result);
+  }
+
+  @Test
+  void testPayloadClassNameVersion9WithoutLegacyPayload() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), config);
+    assertEquals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), result);
+  }
+
+  @Test
+  void testRecordMergeModeMappingWithVersion9() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION.key(), "9");
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_MODE.key(), config);
+    assertEquals(HoodieTableConfig.RECORD_MERGE_MODE.key(), result);
+  }
+
+  @Test
+  void testRecordMergeModeMappingWithVersion8() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION.key(), "8");
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_MODE.key(), config);
+    assertEquals(HoodieWriteConfig.RECORD_MERGE_MODE.key(), result);
+  }
+
+  @Test
+  void testRecordMergeStrategyIdMappingWithVersion9() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION.key(), "9");
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), config);
+    assertEquals(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(), result);
+  }
+
+  @Test
+  void testRecordMergeStrategyIdMappingWithVersion8() {
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(HoodieTableConfig.VERSION.key(), "8");
+    String result = HoodieWriterUtils.getKeyInTableConfig(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), config);
+    assertEquals(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), result);
+  }
+
+  @Test
+  void testFallbackToOriginalKey() {
+    HoodieConfig config = new HoodieConfig();
+    String result = HoodieWriterUtils.getKeyInTableConfig("my.custom.key", config);
+    assertEquals("my.custom.key", result);
   }
 }
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 326b6ce..38e107d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -152,7 +152,6 @@
     //          being queried by the Spark, and we currently have no way figuring out what these fields are, therefore
     //          we fallback to read whole row
     val overriddenOpts = defaultWriteOpts ++ Map(
-      HoodieWriteConfig.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(),
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
     )
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index ec5dd8d..2f59db9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -122,8 +122,6 @@
       .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts")
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
-      .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, "org.apache.hudi.common.model.PartialUpdateAvroPayload")
-      .option(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
       .mode(SaveMode.Append)
       .save(basePath)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index b63bf61..6e36702 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -811,7 +811,6 @@
          |  hoodie.enable.data.skipping = 'true',
          |  hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.AWSDmsAvroPayload',
          |  hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
-         |  hoodie.write.record.merge.mode = 'CUSTOM',
          |  hoodie.table.cdc.enabled = 'true',
          |  hoodie.table.cdc.supplemental.logging.mode = 'data_before_after'
          | )
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 1044f5d..537469b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -21,12 +21,14 @@
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.read.CustomPayloadForTesting
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.functional.CommonOptionUtils.getWriterReaderOpts
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.index.HoodieIndex.IndexType
@@ -38,7 +40,8 @@
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.types.StructType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
@@ -311,7 +314,8 @@
     "MERGE_ON_READ,8,COMMIT_TIME_ORDERING,GLOBAL_SIMPLE"))
   def testDeletesWithHoodieIsDeleted(tableType: HoodieTableType, tableVersion: Int, mergeMode: RecordMergeMode, indexType: IndexType): Unit = {
     var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
-    writeOpts = writeOpts ++ Map("hoodie.write.table.version" -> tableVersion.toString,
+    writeOpts = writeOpts ++ Map(
+      "hoodie.write.table.version" -> tableVersion.toString,
       "hoodie.datasource.write.table.type" -> tableType.name(),
       HoodieTableConfig.ORDERING_FIELDS.key() -> "ts",
       "hoodie.write.record.merge.mode" -> mergeMode.name(),
@@ -373,6 +377,104 @@
       .options(readOpts).load(basePath).select("_hoodie_record_key", "_hoodie_partition_path").count())
   }
 
+  @ParameterizedTest
+  @CsvSource(value = Array(
+    "8,EVENT_TIME_ORDERING",
+    "8,COMMIT_TIME_ORDERING",
+    "8,CUSTOM",
+    "9,EVENT_TIME_ORDERING",
+    "9,COMMIT_TIME_ORDERING",
+    "9,CUSTOM"))
+  def testImmutabilityOfMergeConfigs(tableVersion: Int,
+                                     mergeMode: RecordMergeMode): Unit = {
+    var (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+    val payloadClass = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+      classOf[DefaultHoodieRecordPayload].getName
+    } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
+      classOf[OverwriteWithLatestAvroPayload].getName
+    } else {
+      classOf[CustomPayloadForTesting].getName
+    }
+    val strategyId = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+      HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID
+    } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
+      HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+    } else {
+      HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
+    }
+    writeOpts = writeOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> tableVersion.toString,
+      HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false",
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> payloadClass,
+      HoodieTableConfig.ORDERING_FIELDS.key -> "ts",
+      HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode.name())
+
+    // generate the inserts
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val inserts = DataSourceTestUtils.generateRandomRows(400)
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(
+      convertRowListToSeq(inserts)), structType)
+
+    // Do insert.
+    df.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    // Validate data quality.
+    val readDf = spark.read.format("hudi")
+      .options(readOpts)
+      .load(basePath)
+    assertEquals(400, readDf.count())
+    // Validate table configs.
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
+    assertEquals(tableVersion, metaClient.getTableConfig.getTableVersion.versionCode())
+    assertEquals(mergeMode, metaClient.getTableConfig.getRecordMergeMode)
+    assertEquals(strategyId, metaClient.getTableConfig.getRecordMergeStrategyId)
+
+    val df1 = df.limit(1)
+    val diffMergeMode = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
+      RecordMergeMode.COMMIT_TIME_ORDERING
+    } else {
+      RecordMergeMode.EVENT_TIME_ORDERING
+    }
+    if (tableVersion < 9) {
+      df1.write.format("hudi")
+        .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, diffMergeMode.name)
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+        .mode(SaveMode.Append)
+        .save(basePath)
+      val finalDf = spark.read.format("hudi")
+        .options(readOpts)
+        .load(basePath)
+      assertEquals(399, finalDf.count())
+    } else {
+      Assertions.assertThrows(classOf[HoodieException], () => {
+        df1.write.format("hudi")
+          .option(HoodieWriteConfig.RECORD_MERGE_MODE.key, "any_other_payload")
+          .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+          .mode(SaveMode.Append)
+          .save(basePath)
+      })
+    }
+    Assertions.assertThrows(classOf[HoodieException], () => {
+      df1.write.format("hudi")
+        .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, "any_other_payload")
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+        .mode(SaveMode.Append)
+        .save(basePath)
+    })
+    Assertions.assertThrows(classOf[HoodieException], () => {
+      df1.write.format("hudi")
+        .option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, "any_other_strategy_id")
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+        .mode(SaveMode.Append)
+        .save(basePath)
+    })
+  }
+
   def ingestNewBatch(tableType: HoodieTableType, recordsToUpdate: Integer, structType: StructType, inserts: java.util.List[Row],
                      writeOpts: Map[String, String]): Unit = {
     val toUpdate = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, recordsToUpdate), structType).collectAsList()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index 52fec90..cd442af 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -653,7 +653,6 @@
       "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
       "hoodie.datasource.write.partitionpath.field" -> "",
       "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.AWSDmsAvroPayload",
-      DataSourceWriteOptions.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(),
       "hoodie.table.cdc.enabled" -> "true",
       "hoodie.table.cdc.supplemental.logging.mode" -> "data_before_after"
     )
@@ -787,7 +786,6 @@
         .option("hoodie.datasource.write.operation", "upsert")
         .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
         .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload")
-        .option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
         .option("hoodie.table.cdc.enabled", "true")
         .option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name())
         .mode(SaveMode.Append).save(basePath)
@@ -808,7 +806,6 @@
         .option("hoodie.datasource.write.operation", "upsert")
         .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
         .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload")
-        .option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
         .option("hoodie.table.cdc.enabled", "true")
         .option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name())
         .mode(SaveMode.Append).save(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
index c277af2..2c345fd 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
@@ -681,9 +681,7 @@
   test("Test MergeInto Partial Updates should fail with CUSTOM payload and merge mode") {
     withTempDir { tmp =>
       withSQLConf(
-        "hoodie.index.type" -> "GLOBAL_SIMPLE",
-        "hoodie.write.record.merge.mode" -> "CUSTOM",
-        "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload") {
+        "hoodie.index.type" -> "GLOBAL_SIMPLE") {
         val tableName = generateTableName
         spark.sql(
           s"""
@@ -700,6 +698,8 @@
              | TBLPROPERTIES (
              |   type = 'mor',
              |   primaryKey = 'record_key',
+             |   hoodie.write.record.merge.mode = 'CUSTOM',
+             |   hoodie.datasource.write.payload.class = 'org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload',
              |   preCombineField = 'ts')""".stripMargin)
 
         spark.sql(
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 652776b..d2e3753 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -39,6 +39,7 @@
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -61,6 +62,7 @@
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieErrorTableConfig;
@@ -157,6 +159,7 @@
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
 import static org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName;
 import static org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException;
+import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
 import static org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_FORCE_SKIP;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -429,12 +432,27 @@
                             StorageConfiguration<?> storageConf) throws IOException {
     this.commitsTimelineOpt = Option.empty();
     this.allCommitsTimelineOpt = Option.empty();
+
+    RecordMergeMode mergeMode = cfg.recordMergeMode;
+    String mergeStrategyId = cfg.recordMergeStrategyId;
+    String payloadClass = cfg.payloadClassName;
+
+    // When at least one configs from cfg.configs are not null,
+    // we should try to use them since it overrides the default ones.
+    Option<Triple<RecordMergeMode, String, String>> overridingMergeConfigs =
+        parseOverridingMergeConfigs(cfg.configs);
+    if (overridingMergeConfigs.isPresent()) {
+      mergeMode = overridingMergeConfigs.get().getLeft();
+      payloadClass = overridingMergeConfigs.get().getMiddle();
+      mergeStrategyId = overridingMergeConfigs.get().getRight();
+    }
+
     return tableBuilder.setTableType(cfg.tableType)
         .setTableName(cfg.targetTableName)
         .setArchiveLogFolder(TIMELINE_HISTORY_PATH.defaultValue())
-        .setPayloadClassName(cfg.payloadClassName)
-        .setRecordMergeStrategyId(cfg.recordMergeStrategyId)
-        .setRecordMergeMode(cfg.recordMergeMode)
+        .setPayloadClassName(payloadClass)
+        .setRecordMergeStrategyId(mergeStrategyId)
+        .setRecordMergeMode(mergeMode)
         .setBaseFileFormat(cfg.baseFileFormat)
         .setPartitionFields(partitionColumns)
         .setTableVersion(ConfigUtils.getIntWithAltKeys(props, WRITE_TABLE_VERSION))
@@ -460,6 +478,23 @@
         .initTable(storageConf, cfg.targetBasePath);
   }
 
+  static Option<Triple<RecordMergeMode, String, String>> parseOverridingMergeConfigs(List<String> configs) {
+    if (configs == null || configs.isEmpty()) {
+      return Option.empty();
+    }
+    TypedProperties overwrittenProps = buildProperties(configs);
+    if (overwrittenProps.containsKey(HoodieWriteConfig.RECORD_MERGE_MODE.key())
+        || overwrittenProps.containsKey(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key())
+        || overwrittenProps.containsKey(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key())) {
+      RecordMergeMode mergeMode = overwrittenProps.containsKey(HoodieWriteConfig.RECORD_MERGE_MODE.key())
+          ? RecordMergeMode.valueOf(overwrittenProps.getProperty(HoodieWriteConfig.RECORD_MERGE_MODE.key())) : null;
+      String mergeStrategyId = overwrittenProps.getString(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), null);
+      String payloadClass = overwrittenProps.getProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), null);
+      return Option.of(Triple.of(mergeMode, payloadClass, mergeStrategyId));
+    }
+    return Option.empty();
+  }
+
   /**
    * Run one round of delta sync and return new compaction instant if one got scheduled.
    */
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 4106260..c9614a7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -211,7 +211,7 @@
       Map<String, String> opts = new HashMap<>();
       opts.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), DefaultSparkRecordMerger.class.getName());
       opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
-      opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name());
+      opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.EVENT_TIME_ORDERING.name());
       opts.put(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key(), HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID);
       for (Map.Entry<String, String> entry : opts.entrySet()) {
         hoodieConfig.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index 832c655..850b15e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -21,6 +21,7 @@
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,6 +29,7 @@
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.HoodieStorage;
@@ -46,12 +48,14 @@
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -66,6 +70,7 @@
 import static org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -389,4 +394,61 @@
     expected.put(CHECKPOINT_RESET_KEY, "test-checkpoint");
     assertEquals(expected, result, "Should return default metadata when checkpoint is null");
   }
+
+  @Nested
+  class TestsWithoutSetup {
+    @Test
+    void testParseOverridingMergeConfigsWithEmptyConfigs() {
+      List<String> configs = Collections.emptyList();
+      Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+      assertTrue(result.isEmpty());
+    }
+
+    @Test
+    void testParseOverridingMergeConfigsWithNullConfigs() {
+      Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(null);
+      assertTrue(result.isEmpty(), "Should return empty option when configs list is null");
+    }
+
+    @Test
+    void testParseOverridingMergeConfigsWithNonMergeConfigs() {
+      List<String> configs = Arrays.asList(
+          "hoodie.datasource.write.table.name=testTable",
+          "hoodie.datasource.write.operation=upsert"
+      );
+      Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+      assertTrue(result.isEmpty());
+    }
+
+    @Test
+    void testParseOverridingMergeConfigsWithSomeMergeConfigs() {
+      List<String> configs = Arrays.asList(
+          "hoodie.datasource.write.table.name=testTable",
+          "hoodie.datasource.write.operation=upsert",
+          "hoodie.write.record.merge.mode=COMMIT_TIME_ORDERING"
+      );
+      Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+      assertTrue(result.isPresent());
+      assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, result.get().getLeft());
+      assertNull(result.get().getMiddle());
+      assertNull(result.get().getRight());
+    }
+
+    @Test
+    void testParseOverridingMergeConfigsWithMixedConfigs() {
+      List<String> configs = Arrays.asList(
+          "hoodie.datasource.write.table.name=testTable",
+          "hoodie.write.record.merge.mode=COMMIT_TIME_ORDERING",
+          "hoodie.datasource.write.operation=upsert",
+          "hoodie.write.record.merge.strategy.id=any_id",
+          "hoodie.datasource.write.payload.class=org.apache.hudi.common.model.OverwriteWithLatestPayload"
+      );
+      Option<Triple<RecordMergeMode, String, String>> result = StreamSync.parseOverridingMergeConfigs(configs);
+      assertTrue(result.isPresent());
+      Triple<RecordMergeMode, String, String> triple = result.get();
+      assertEquals(RecordMergeMode.COMMIT_TIME_ORDERING, triple.getLeft());
+      assertEquals("org.apache.hudi.common.model.OverwriteWithLatestPayload", triple.getMiddle());
+      assertEquals("any_id", triple.getRight());
+    }
+  }
 }