test: Add lowering order and higher ordering values to payloads inn test (#14001)

diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 0db2323..76e23f8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -21,14 +21,15 @@
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, ORDERING_FIELDS, RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig, TypedProperties}
 import org.apache.hudi.common.model.{AWSDmsAvroPayload, DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger, HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
 import org.apache.hudi.common.model.debezium.{DebeziumConstants, MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
 import org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 
 import org.apache.spark.sql.SaveMode
@@ -45,10 +46,11 @@
    */
   @ParameterizedTest
   @MethodSource(Array("providePayloadClassTestCases"))
-  def testMergerBuiltinPayloadUpgradePath(tableType: String,
+  def testMergerBuiltinPayloadUpgradeDowngradePath(tableType: String,
                                           payloadClazz: String,
                                           useOpAsDeleteStr: String,
-                                          expectedConfigs: Map[String, String]): Unit = {
+                                          expectedConfigs: Map[String, String],
+                                          expectedDowngradeConfigs: Map[String, String]): Unit = {
     val useOpAsDelete = useOpAsDeleteStr.equals("true")
     val deleteOpts: Map[String, String] = if (useOpAsDelete) {
       Map(DefaultHoodieRecordPayload.DELETE_KEY -> "Op", DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
@@ -59,6 +61,14 @@
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
       HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
 
+    // Common table service configurations
+    val serviceOpts: Map[String, String] = Map(
+      HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
+      HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key() -> "512000",
+      HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key() -> "512000"
+    )
+
     val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq",
       DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME)
     // 1. Add an insert.
@@ -67,33 +77,44 @@
       (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1, "i"),
       (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
       (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
-      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"),
-      (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
     val inserts = spark.createDataFrame(data).toDF(columns: _*)
-    val originalOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
-      "_event_seq"
+    val originalOrderingFields: Option[String] = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      Some("_event_seq")
     } else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
-      "_event_lsn"
+      Some("_event_lsn")
+    } else if (payloadClazz.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+      || payloadClazz.equals(classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName)
+      || payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+      None  // Don't set ordering fields for commit-time ordering payloads
     } else {
-      "ts"
+      Some("ts")
     }
     val expectedOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
       "_event_bin_file,_event_pos"
     } else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
       "_event_lsn"
     } else {
-      originalOrderingFields
+      originalOrderingFields.orNull
     }
-    inserts.write.format("hudi").
-      option(RECORDKEY_FIELD.key(), "_event_lsn").
-      option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
-      option(TABLE_TYPE.key(), tableType).
+    val insertWriter = inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "_event_lsn")
+    val writerWithOrdering = originalOrderingFields match {
+      case Some(fields) => insertWriter.option(HoodieTableConfig.ORDERING_FIELDS.key(), fields)
+      case None => insertWriter
+    }
+    writerWithOrdering.option(TABLE_TYPE.key(), tableType).
       option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
+      option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), "2048").
+      option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1024").
+      options(serviceOpts).
       options(opts).
       mode(SaveMode.Overwrite).
       save(basePath)
+
     // Verify table was created successfully
     var metaClient = HoodieTableMetaClient.builder()
       .setBasePath(basePath)
@@ -108,46 +129,59 @@
       (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
       (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
       (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
-    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
-    firstUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
-      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
-      option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
-      options(opts).
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(firstUpdateData, columns, serviceOpts, opts, basePath,
+      tableVersion = Some("8"), orderingFields = originalOrderingFields)
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
     val firstUpdateInstantTime = metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
-    // 3. Add an update. This is expected to trigger the upgrade
+    // 3. Add mixed ordering test data to validate proper ordering handling
+    // This tests that updates/deletes with lower ordering values are ignored
+    // while higher ordering values are applied
+    val mixedOrderingData = Seq(
+      // Update rider-B with LOWER ordering - EVENT_TIME: IGNORED, COMMIT_TIME: APPLIED
+      (8, 2L, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+      // Update rider-C with HIGHER ordering - should be APPLIED
+      (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+      // Update rider-C with LOWER ordering - should be IGNORED (rider-C has ts=10 originally)
+      (8, 3L, "rider-CC", "driver-CC", 30.00, "u", "8.1", 8, 1, "u"),
+      // Delete rider-E with LOWER ordering - should be IGNORED (rider-E has ts=10 originally)
+      (9, 5L, "rider-EE", "driver-EE", 17.85, "D", "9.1", 9, 1, "d"))
+    performUpsert(mixedOrderingData, columns, serviceOpts, opts, basePath,
+      tableVersion = Some("8"), orderingFields = originalOrderingFields)
+    // Validate table version is still 8 after mixed ordering batch
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
+
+    // 4. Add an update. This is expected to trigger the upgrade
+    val compactionEnabled = if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) "true" else "false"
     val secondUpdateData = Seq(
       (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
       // For rider-DD we purposefully deviate and set the _event_seq to be less than the _event_bin_file and _event_pos
       // so that the test will fail if _event_seq is still used for ordering
       (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
       (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
-    val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
-    secondUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "9").
-      options(opts).
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(secondUpdateData, columns, serviceOpts, opts, basePath,
+      tableVersion = Some("9"), compactionEnabled = compactionEnabled)
     // Validate table version as 9.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
     assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
     assertEquals(isCDCPayload(payloadClazz) || useOpAsDelete,
       metaClient.getTableConfig.getProps.containsKey(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY))
-    assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(""))
-    val compactionInstants = metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
-    val foundCompaction = compactionInstants.stream().anyMatch(i => i.getAction.equals("commit"))
-    assertTrue(foundCompaction)
+    assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(null))
 
-    // 4. Add a trivial update to trigger payload class mismatch.
+    // 5. Add a trivial update to trigger payload class mismatch.
     val thirdUpdateData = Seq(
       (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"))
     val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
@@ -164,7 +198,7 @@
       }
     }
 
-    // 5. Add a delete.
+    // 6. Add a delete.
     val fourthUpdateData = Seq(
       (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
       (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
@@ -173,10 +207,28 @@
       option(OPERATION.key(), "delete").
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+      options(serviceOpts).
       mode(SaveMode.Append).
       save(basePath)
 
-    // 6. Validate.
+    // 7. Add INSERT operation.
+    val insertData = Seq(
+      (13, 6L, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+      (13, 7L, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+    performInsert(insertData, columns, serviceOpts, basePath)
+
+    // Final validation of table management operations after all writes
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    validateTableManagementOps(metaClient, tableType,
+      expectCompaction = tableType.equals(HoodieTableType.MERGE_ON_READ.name()),
+      expectClustering = true,   // Enable clustering validation with lowered thresholds
+      expectCleaning = false,
+      expectArchival = false)
+
+    // 8. Validate.
     // Validate table configs.
     tableConfig = metaClient.getTableConfig
     expectedConfigs.foreach { case (key, expectedValue) =>
@@ -206,6 +258,9 @@
         expectedTimeTravelDf.except(timeTravelDf).isEmpty
           && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
     }
+
+    // 9-10. Perform downgrade and validation
+    performDowngradeAndValidate(metaClient, basePath, expectedDowngradeConfigs, expectedData, columns, serviceOpts)
   }
 
   @ParameterizedTest
@@ -213,7 +268,8 @@
   def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
                                                     payloadClazz: String,
                                                     useOpAsDeleteStr: String,
-                                                    expectedConfigs: Map[String, String]): Unit = {
+                                                    expectedConfigs: Map[String, String],
+                                                    expectedDowngradeConfigs: Map[String, String]): Unit = {
     val useOpAsDelete = useOpAsDeleteStr.equals("true")
     val deleteOpts: Map[String, String] = if (useOpAsDelete) {
       Map(DefaultHoodieRecordPayload.DELETE_KEY -> "Op", DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
@@ -223,6 +279,15 @@
     val opts: Map[String, String] = Map(
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
       HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
+
+    // Common table service configurations
+    val serviceOpts: Map[String, String] = Map(
+      HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
+      HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key() -> "512000",
+      HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key() -> "512000"
+    )
+
     val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq",
       DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME)
     // 1. Add an insert.
@@ -231,27 +296,37 @@
       (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1, "i"),
       (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
       (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
-      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"),
-      (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
     val inserts = spark.createDataFrame(data).toDF(columns: _*)
-    val originalOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
-      "_event_seq"
+    val originalOrderingFields: Option[String] = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      Some("_event_seq")
+    } else if (payloadClazz.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+      || payloadClazz.equals(classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName)
+      || payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+      None  // Don't set ordering fields for commit-time ordering payloads
     } else {
-      "ts"
+      Some("ts")
     }
     val expectedOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
       "_event_bin_file,_event_pos"
     } else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
       "_event_lsn"
     } else {
-      originalOrderingFields
+      originalOrderingFields.orNull
     }
-    inserts.write.format("hudi").
-      option(RECORDKEY_FIELD.key(), "_event_lsn").
-      option(ORDERING_FIELDS.key(), originalOrderingFields).
-      option(TABLE_TYPE.key(), tableType).
+    val insertWriter = inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "_event_lsn")
+    val writerWithOrdering = originalOrderingFields match {
+      case Some(fields) => insertWriter.option(ORDERING_FIELDS.key(), fields)
+      case None => insertWriter
+    }
+    writerWithOrdering.option(TABLE_TYPE.key(), tableType).
       option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), "2048").
+      option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1024").
+      options(serviceOpts).
       options(opts).
       mode(SaveMode.Overwrite).
       save(basePath)
@@ -278,21 +353,38 @@
       (11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
       (12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
       (11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
-    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
-    firstUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(firstUpdateData, columns, serviceOpts, Map.empty, basePath)
     // Validate table version.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
     // validate ordering fields
-    assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(""))
+    assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(null))
     val firstUpdateInstantTime = metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
 
+    // 3. Add mixed ordering test data to validate proper ordering handling
+    // This tests that updates/deletes with lower ordering values are ignored
+    // while higher ordering values are applied
+    val mixedOrderingData = Seq(
+      // Update rider-B with LOWER ordering - EVENT_TIME: IGNORED, COMMIT_TIME: APPLIED
+      (8, 2L, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+      // Update rider-C with HIGHER ordering - should be APPLIED
+      (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+      // Update rider-C with LOWER ordering - should be IGNORED (rider-C has ts=10 originally)
+      (8, 3L, "rider-CC", "driver-CC", 30.00, "u", "8.1", 8, 1, "u"),
+      // Delete rider-E with LOWER ordering - should be IGNORED (rider-E has ts=10 originally)
+      (9, 5L, "rider-EE", "driver-EE", 17.85, "D", "9.1", 9, 1, "d"))
+    performUpsert(mixedOrderingData, columns, serviceOpts, opts, basePath)
+    // Validate table version is still 9 after mixed ordering batch
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
 
-    // 3. Add an update. This is expected to trigger the upgrade
+    // 4. Add an update. This is expected to trigger the upgrade
     val compactionEnabled = if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
       "true"
     } else {
@@ -304,22 +396,17 @@
       // so that the test will fail if _event_seq is still used for ordering
       (9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
       (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
-    val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
-    secondUpdate.write.format("hudi").
-      option(OPERATION.key(), "upsert").
-      option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
-      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
-      mode(SaveMode.Append).
-      save(basePath)
+    performUpsert(secondUpdateData, columns, serviceOpts, Map.empty, basePath,
+      compactionEnabled = compactionEnabled)
     // Validate table version as 9.
-    metaClient = HoodieTableMetaClient.reload(metaClient)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
     assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
     assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
-    val compactionInstants = metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
-    val foundCompaction = compactionInstants.stream().anyMatch(i => i.getAction.equals("commit"))
-    assertTrue(foundCompaction)
 
-    // 4. Add a trivial update to trigger payload class mismatch.
+    // 5. Add a trivial update to trigger payload class mismatch.
     val thirdUpdateData = Seq(
       (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"))
     val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
@@ -336,7 +423,7 @@
       }
     }
 
-    // 5. Add a delete.
+    // 6. Add a delete.
     val fourthUpdateData = Seq(
       (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
       (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
@@ -345,10 +432,28 @@
       option(OPERATION.key(), "delete").
       option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
       option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+      options(serviceOpts).
       mode(SaveMode.Append).
       save(basePath)
 
-    // 6. Validate.
+    // 7. Add INSERT operation.
+    val insertData = Seq(
+      (13, 6L, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+      (13, 7L, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+    performInsert(insertData, columns, serviceOpts, basePath)
+
+    // Final validation of table management operations after all writes
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+    validateTableManagementOps(metaClient, tableType,
+      expectCompaction = tableType.equals(HoodieTableType.MERGE_ON_READ.name()),
+      expectClustering = true,   // Enable clustering validation with lowered thresholds
+      expectCleaning = false,
+      expectArchival = false)
+
+    // 8. Validate.
     // Validate table configs again.
     tableConfig = metaClient.getTableConfig
     expectedConfigs.foreach { case (key, expectedValue) =>
@@ -376,6 +481,9 @@
     assertTrue(
       expectedTimeTravelDf.except(timeTravelDf).isEmpty
         && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+
+    // 9-10. Perform downgrade and validation
+    performDowngradeAndValidate(metaClient, basePath, expectedDowngradeConfigs, expectedData, columns, serviceOpts)
   }
 
   def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
@@ -386,59 +494,294 @@
       .build()
   }
 
+  /**
+   * Helper method to perform upsert operations with configurable options
+   */
+  def performUpsert(data: Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)],
+                    columns: Seq[String],
+                    serviceOpts: Map[String, String],
+                    opts: Map[String, String],
+                    basePath: String,
+                    tableVersion: Option[String] = None,
+                    orderingFields: Option[String] = None,
+                    compactionEnabled: String = "false",
+                    compactionNumDeltaCommits: String = "1"): Unit = {
+    val dataFrame = spark.createDataFrame(data).toDF(columns: _*)
+    val writer = dataFrame.write.format("hudi")
+      .option(OPERATION.key(), "upsert")
+      .option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled)
+      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), compactionNumDeltaCommits)
+      .options(serviceOpts)
+
+    val writerWithVersion = tableVersion match {
+      case Some(version) => writer.option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), version)
+      case None => writer
+    }
+
+    val writerWithOrdering = orderingFields match {
+      case Some(fields) => writerWithVersion.option(HoodieTableConfig.ORDERING_FIELDS.key(), fields)
+      case None => writerWithVersion
+    }
+
+    writerWithOrdering.options(opts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+  }
+
+  /**
+   * Helper method to perform insert operations
+   */
+  def performInsert(data: Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)],
+                    columns: Seq[String],
+                    serviceOpts: Map[String, String],
+                    basePath: String): Unit = {
+    val dataFrame = spark.createDataFrame(data).toDF(columns: _*)
+    dataFrame.write.format("hudi")
+      .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false")
+      .options(serviceOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+  }
+
+  /**
+   * Helper method to perform downgrade from v9 to v8 and validate the results
+   */
+  def performDowngradeAndValidate(metaClient: HoodieTableMetaClient,
+                                   basePath: String,
+                                   expectedDowngradeConfigs: Map[String, String],
+                                   expectedData: Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)],
+                                   columns: Seq[String],
+                                   serviceOpts: Map[String, String]): Unit = {
+    // 9. Downgrade from v9 to v8
+    val writeConfig = HoodieWriteConfig.newBuilder()
+      .withPath(basePath)
+      .withSchema(spark.read.format("hudi").load(basePath).schema.json)
+      .build()
+
+    new UpgradeDowngrade(metaClient, writeConfig, context, SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.EIGHT, null)
+
+    // Reload metaClient to get updated table config
+    val downgradedMetaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storageConf())
+      .build()
+
+    // Validate table version is 8
+    assertEquals(8, downgradedMetaClient.getTableConfig.getTableVersion.versionCode())
+
+    // Validate downgrade configs
+    val downgradedTableConfig = downgradedMetaClient.getTableConfig
+    expectedDowngradeConfigs.foreach { case (key, expectedValue) =>
+      if (expectedValue != null) {
+        assertEquals(expectedValue, downgradedTableConfig.getString(key), s"Config $key should be $expectedValue after downgrade")
+      } else {
+        assertFalse(downgradedTableConfig.contains(key), s"Config $key should not be present after downgrade")
+      }
+    }
+
+    // 10. Add post-downgrade upsert to verify table functionality
+    performUpsert(
+      Seq((14, 10L, "rider-Z", "driver-Z", 45.50, "i", "14.1", 14, 1, "i")),
+      columns, serviceOpts, Map.empty, basePath)
+
+    // Validate data consistency after downgrade including new row
+    val downgradeDf = spark.read.format("hudi").load(basePath)
+    val downgradeFinalDf = downgradeDf.select("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME).sort("_event_lsn")
+
+    // Create expected data including the new post-downgrade row
+    val expectedDataWithNewRow = expectedData ++ Seq((14, 10L, "rider-Z", "driver-Z", 45.50, "i", "14.1", 14, 1, "i"))
+    val expectedDfWithNewRow = spark.createDataFrame(spark.sparkContext.parallelize(expectedDataWithNewRow)).toDF(columns: _*).sort("_event_lsn")
+    assertTrue(expectedDfWithNewRow.except(downgradeFinalDf).isEmpty && downgradeFinalDf.except(expectedDfWithNewRow).isEmpty,
+      "Data should remain consistent after downgrade including new row")
+  }
+
   def getExpectedResultForSnapshotQuery(payloadClazz: String, usesDeleteMarker: Boolean): Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)] = {
     if (!isCDCPayload(payloadClazz) && !usesDeleteMarker) {
       if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
         || payloadClazz.equals(classOf[EventTimeAvroPayload].getName)
         || payloadClazz.equals(classOf[DefaultHoodieRecordPayload].getName))
       {
+        // Expected results after all operations with _event_lsn collisions:
+        // - rider-X (_event_lsn=1): deleted with higher ordering (ts=12)
+        // - rider-Y (_event_lsn=2): updated with higher ordering (ts=11), lower ordering rider-BB rejected (ts=8 < 11)
+        // - _event_lsn=3: DELETED by delete operation (was rider-CC with ts=12)
+        // - _event_lsn=4: rider-D stays with original data (rider-DD ts=9 < rider-D ts=10)
+        // - _event_lsn=5: DELETED by delete operation (was rider-EE with ts=12)
         Seq(
           (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
           (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
           (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
-          (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+          (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+          (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
       } else {
+        // For other payload types (OverwriteWithLatestAvroPayload, OverwriteNonDefaultsWithLatestAvroPayload)
+        // These use COMMIT_TIME_ORDERING, so latest write wins regardless of ts value
+        // _event_lsn=2: rider-BB overwrites rider-Y (latest commit wins even though ts=8 < ts=11)
+        // _event_lsn=3: rider-CC overwrites (latest commit), then deleted
+        // _event_lsn=4: rider-DD overwrites (latest commit)
+        // _event_lsn=5: rider-EE overwrites (latest commit), then deleted
         Seq(
           (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
-          (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
+          (8, 2, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
           (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
-          (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+          (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+          (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
       }
     } else {
+      // For CDC payloads or when delete markers are used
       if (payloadClazz.equals(classOf[DefaultHoodieRecordPayload].getName)) {
+        // Delete markers remove records completely
+        // Note: rider-B keeps original values because rider-BB update (ts=8) was rejected (8 < 11) for EVENT_TIME
+        // Note: rider-D keeps original values because rider-DD update (ts=9) was rejected (9 < 10)
+        // _event_lsn=1 (rider-X) and _event_lsn=5 (rider-EE) are deleted by delete markers
         Seq(
           (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
-          (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"))
+          (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
+          (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+          (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+      } else if (payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+        // AWSDmsAvroPayload uses COMMIT_TIME_ORDERING - latest commit wins regardless of ts value
+        // Mixed batch: rider-BB applies (latest commit wins even though ts=8 < ts=11), rider-CC update applies (latest commit)
+        // Second update: rider-DD applies (latest commit wins over rider-D)
+        // Final: _event_lsn=2 has rider-BB, _event_lsn=3 and _event_lsn=5 deleted, _event_lsn=4 has rider-DD
+        Seq(
+          (8, 2, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
+          (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+          (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+      } else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
+        // PostgresDebeziumAvroPayload uses EVENT_TIME_ORDERING with _event_lsn field
+        // But second update applies rider-DD due to later commit time (behaves like commit-time ordering)
+        // rider-BB behaves like commit-time ordering too (latest commit wins)
+        Seq(
+          (8, 2, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
+          (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+          (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
       } else {
+        // For MySqlDebeziumAvroPayload
+        // Uses EVENT_TIME_ORDERING with _event_seq in v8, then _event_bin_file,_event_pos in v9
+        // Mixed ordering batch (v8): rider-BB has lower _event_seq (8.1 < 11.1) so REJECTED, rider-Y stays
+        // Second update (v9): rider-DD has higher _event_bin_file (12 > 10) so APPLIED
         Seq(
           (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
-          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"))
+          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
+          (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+          (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
       }
     }
   }
 
   def getExpectedResultForTimeTravelQuery(payloadClazz: String, usesDeleteMarker: Boolean):
   Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)] = {
+    // Time travel query shows state after first update but BEFORE mixed ordering batch
+    // So rider-C, rider-D, rider-E should still have their original values
     if (!isCDCPayload(payloadClazz) && !usesDeleteMarker) {
       Seq(
         (12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
         (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
-        (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
-        (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
-        (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"),
-        (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+        (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"), // Original rider-C before mixed ordering
+        (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),   // Original rider-D before mixed ordering
+        (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))   // Original rider-E before mixed ordering
     } else {
+      // For CDC payloads or when delete markers are used
       Seq(
         (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
-        (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
-        (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
-        (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
+        (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"), // Original rider-C before mixed ordering
+        (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),   // Original rider-D before mixed ordering
+        (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))   // Original rider-E before mixed ordering
     }
   }
 
   private def isCDCPayload(payloadClazz: String) = {
     payloadClazz.equals(classOf[AWSDmsAvroPayload].getName) || payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName) || payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)
   }
+
+  /**
+   * Helper method to validate that compaction occurred for MOR tables
+   */
+  def validateCompaction(metaClient: HoodieTableMetaClient, tableType: String): Unit = {
+    if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      metaClient.reloadActiveTimeline()
+      val compactionInstants = metaClient.getActiveTimeline
+        .getCommitsAndCompactionTimeline
+        .getInstants
+        .asScala
+        .filter(_.getAction.equals("commit"))
+      assertTrue(compactionInstants.nonEmpty,
+        s"Compaction should have occurred for MOR table but found no commit instants")
+    }
+  }
+
+  /**
+   * Helper method to validate that clustering occurred
+   */
+  def validateClustering(metaClient: HoodieTableMetaClient): Unit = {
+    metaClient.reloadActiveTimeline()
+    val clusteringInstants = metaClient.getActiveTimeline
+      .getInstants
+      .asScala
+      .filter(_.getAction.equals("replacecommit")) // check if this is correct
+    assertTrue(clusteringInstants.nonEmpty,
+      s"Clustering should have occurred but found no replacecommit instants")
+  }
+
+  /**
+   * Helper method to validate that cleaning occurred
+   */
+  def validateCleaning(metaClient: HoodieTableMetaClient): Unit = {
+    metaClient.reloadActiveTimeline()
+    val cleanInstants = metaClient.getActiveTimeline
+      .getCleanerTimeline
+      .getInstants
+      .asScala
+    assertTrue(cleanInstants.nonEmpty,
+      s"Cleaning should have occurred but found no clean instants")
+  }
+
+  /**
+   * Helper method to validate that archival occurred
+   */
+  def validateArchival(metaClient: HoodieTableMetaClient): Unit = {
+    val archivedTimeline = metaClient.getArchivedTimeline
+    val archivedInstants = archivedTimeline.getInstants.asScala
+    assertTrue(archivedInstants.nonEmpty,
+      s"Archival should have occurred but found no archived instants")
+  }
+
+  /**
+   * Helper method to validate all table management operations
+   */
+  def validateTableManagementOps(metaClient: HoodieTableMetaClient,
+                                  tableType: String,
+                                  expectCompaction: Boolean = true,
+                                  expectClustering: Boolean = true,
+                                  expectCleaning: Boolean = true,
+                                  expectArchival: Boolean = true): Unit = {
+    metaClient.reloadActiveTimeline()
+
+    // Validate compaction for MOR tables
+    if (expectCompaction) {
+      validateCompaction(metaClient, tableType)
+    }
+
+    // Validate clustering
+    if (expectClustering) {
+      validateClustering(metaClient)
+    }
+
+    // Validate cleaning
+    if (expectCleaning) {
+      validateCleaning(metaClient)
+    }
+
+    // Validate archival
+    if (expectArchival) {
+      validateArchival(metaClient)
+    }
+  }
 }
 
 object TestPayloadDeprecationFlow {
@@ -450,7 +793,11 @@
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -458,7 +805,13 @@
         "true",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -466,7 +819,11 @@
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -475,7 +832,13 @@
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -486,7 +849,16 @@
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-            -> "__debezium_unavailable_value")
+            -> "__debezium_unavailable_value"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_lsn",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+          HoodieTableConfig.ORDERING_FIELDS.key() -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE -> null)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -495,8 +867,14 @@
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
-          HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME))
-      ),
+          HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME)),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_seq",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.ORDERING_FIELDS.key() -> null)),
       Arguments.of(
         "COPY_ON_WRITE",
         classOf[AWSDmsAvroPayload].getName,
@@ -505,7 +883,14 @@
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
-          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> null)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -513,7 +898,11 @@
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "COPY_ON_WRITE",
@@ -522,7 +911,14 @@
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
+        ),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -530,7 +926,11 @@
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -538,7 +938,13 @@
         "true",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -546,7 +952,11 @@
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -555,7 +965,13 @@
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -566,7 +982,16 @@
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
           HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
-            -> "__debezium_unavailable_value")
+            -> "__debezium_unavailable_value"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_lsn",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+          HoodieTableConfig.ORDERING_FIELDS.key() -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -575,8 +1000,14 @@
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
-          HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME))
-      ),
+          HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME)),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_seq",
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.ORDERING_FIELDS.key() -> null)),
       Arguments.of(
         "MERGE_ON_READ",
         classOf[AWSDmsAvroPayload].getName,
@@ -585,7 +1016,14 @@
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
           HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
-          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> null,
+          HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -593,7 +1031,11 @@
         "false",
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
-          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName)
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       ),
       Arguments.of(
         "MERGE_ON_READ",
@@ -602,7 +1044,14 @@
         Map(
           HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
           HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
-          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
+        ),
+        Map(
+          HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+          HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+          HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+          HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
       )
     )
   }