test: Add lowering order and higher ordering values to payloads inn test (#14001)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
index 0db2323..76e23f8 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala
@@ -21,14 +21,15 @@
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, ORDERING_FIELDS, RECORDKEY_FIELD, TABLE_TYPE}
-import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig, TypedProperties}
import org.apache.hudi.common.model.{AWSDmsAvroPayload, DefaultHoodieRecordPayload, EventTimeAvroPayload, HoodieRecordMerger, HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload, PartialUpdateAvroPayload}
import org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
import org.apache.hudi.common.model.debezium.{DebeziumConstants, MySqlDebeziumAvroPayload, PostgresDebeziumAvroPayload}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.SaveMode
@@ -45,10 +46,11 @@
*/
@ParameterizedTest
@MethodSource(Array("providePayloadClassTestCases"))
- def testMergerBuiltinPayloadUpgradePath(tableType: String,
+ def testMergerBuiltinPayloadUpgradeDowngradePath(tableType: String,
payloadClazz: String,
useOpAsDeleteStr: String,
- expectedConfigs: Map[String, String]): Unit = {
+ expectedConfigs: Map[String, String],
+ expectedDowngradeConfigs: Map[String, String]): Unit = {
val useOpAsDelete = useOpAsDeleteStr.equals("true")
val deleteOpts: Map[String, String] = if (useOpAsDelete) {
Map(DefaultHoodieRecordPayload.DELETE_KEY -> "Op", DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
@@ -59,6 +61,14 @@
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
+ // Common table service configurations
+ val serviceOpts: Map[String, String] = Map(
+ HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
+ HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key() -> "512000",
+ HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key() -> "512000"
+ )
+
val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq",
DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME)
// 1. Add an insert.
@@ -67,33 +77,44 @@
(10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1, "i"),
(10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
(10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
- (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"),
- (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+ (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
val inserts = spark.createDataFrame(data).toDF(columns: _*)
- val originalOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
- "_event_seq"
+ val originalOrderingFields: Option[String] = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ Some("_event_seq")
} else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
- "_event_lsn"
+ Some("_event_lsn")
+ } else if (payloadClazz.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+ || payloadClazz.equals(classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName)
+ || payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+ None // Don't set ordering fields for commit-time ordering payloads
} else {
- "ts"
+ Some("ts")
}
val expectedOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
"_event_bin_file,_event_pos"
} else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
"_event_lsn"
} else {
- originalOrderingFields
+ originalOrderingFields.orNull
}
- inserts.write.format("hudi").
- option(RECORDKEY_FIELD.key(), "_event_lsn").
- option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
- option(TABLE_TYPE.key(), tableType).
+ val insertWriter = inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "_event_lsn")
+ val writerWithOrdering = originalOrderingFields match {
+ case Some(fields) => insertWriter.option(HoodieTableConfig.ORDERING_FIELDS.key(), fields)
+ case None => insertWriter
+ }
+ writerWithOrdering.option(TABLE_TYPE.key(), tableType).
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
+ option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), "2048").
+ option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1024").
+ options(serviceOpts).
options(opts).
mode(SaveMode.Overwrite).
save(basePath)
+
// Verify table was created successfully
var metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
@@ -108,46 +129,59 @@
(11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
(12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
(11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
- val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
- firstUpdate.write.format("hudi").
- option(OPERATION.key(), "upsert").
- option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
- option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
- option(HoodieTableConfig.ORDERING_FIELDS.key(), originalOrderingFields).
- options(opts).
- mode(SaveMode.Append).
- save(basePath)
+ performUpsert(firstUpdateData, columns, serviceOpts, opts, basePath,
+ tableVersion = Some("8"), orderingFields = originalOrderingFields)
// Validate table version.
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
val firstUpdateInstantTime = metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
- // 3. Add an update. This is expected to trigger the upgrade
+ // 3. Add mixed ordering test data to validate proper ordering handling
+ // This tests that updates/deletes with lower ordering values are ignored
+ // while higher ordering values are applied
+ val mixedOrderingData = Seq(
+ // Update rider-B with LOWER ordering - EVENT_TIME: IGNORED, COMMIT_TIME: APPLIED
+ (8, 2L, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+ // Update rider-C with HIGHER ordering - should be APPLIED
+ (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+ // Update rider-C with LOWER ordering - should be IGNORED (rider-C has ts=10 originally)
+ (8, 3L, "rider-CC", "driver-CC", 30.00, "u", "8.1", 8, 1, "u"),
+ // Delete rider-E with LOWER ordering - should be IGNORED (rider-E has ts=10 originally)
+ (9, 5L, "rider-EE", "driver-EE", 17.85, "D", "9.1", 9, 1, "d"))
+ performUpsert(mixedOrderingData, columns, serviceOpts, opts, basePath,
+ tableVersion = Some("8"), orderingFields = originalOrderingFields)
+ // Validate table version is still 8 after mixed ordering batch
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ assertEquals(8, metaClient.getTableConfig.getTableVersion.versionCode())
+
+ // 4. Add an update. This is expected to trigger the upgrade
+ val compactionEnabled = if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) "true" else "false"
val secondUpdateData = Seq(
(12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
// For rider-DD we purposefully deviate and set the _event_seq to be less than the _event_bin_file and _event_pos
// so that the test will fail if _event_seq is still used for ordering
(9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
(12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
- val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
- secondUpdate.write.format("hudi").
- option(OPERATION.key(), "upsert").
- option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "9").
- options(opts).
- mode(SaveMode.Append).
- save(basePath)
+ performUpsert(secondUpdateData, columns, serviceOpts, opts, basePath,
+ tableVersion = Some("9"), compactionEnabled = compactionEnabled)
// Validate table version as 9.
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
assertEquals(isCDCPayload(payloadClazz) || useOpAsDelete,
metaClient.getTableConfig.getProps.containsKey(RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY))
- assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(""))
- val compactionInstants = metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
- val foundCompaction = compactionInstants.stream().anyMatch(i => i.getAction.equals("commit"))
- assertTrue(foundCompaction)
+ assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(null))
- // 4. Add a trivial update to trigger payload class mismatch.
+ // 5. Add a trivial update to trigger payload class mismatch.
val thirdUpdateData = Seq(
(12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"))
val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
@@ -164,7 +198,7 @@
}
}
- // 5. Add a delete.
+ // 6. Add a delete.
val fourthUpdateData = Seq(
(12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
(12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
@@ -173,10 +207,28 @@
option(OPERATION.key(), "delete").
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+ options(serviceOpts).
mode(SaveMode.Append).
save(basePath)
- // 6. Validate.
+ // 7. Add INSERT operation.
+ val insertData = Seq(
+ (13, 6L, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7L, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+ performInsert(insertData, columns, serviceOpts, basePath)
+
+ // Final validation of table management operations after all writes
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ validateTableManagementOps(metaClient, tableType,
+ expectCompaction = tableType.equals(HoodieTableType.MERGE_ON_READ.name()),
+ expectClustering = true, // Enable clustering validation with lowered thresholds
+ expectCleaning = false,
+ expectArchival = false)
+
+ // 8. Validate.
// Validate table configs.
tableConfig = metaClient.getTableConfig
expectedConfigs.foreach { case (key, expectedValue) =>
@@ -206,6 +258,9 @@
expectedTimeTravelDf.except(timeTravelDf).isEmpty
&& timeTravelDf.except(expectedTimeTravelDf).isEmpty)
}
+
+ // 9-10. Perform downgrade and validation
+ performDowngradeAndValidate(metaClient, basePath, expectedDowngradeConfigs, expectedData, columns, serviceOpts)
}
@ParameterizedTest
@@ -213,7 +268,8 @@
def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
payloadClazz: String,
useOpAsDeleteStr: String,
- expectedConfigs: Map[String, String]): Unit = {
+ expectedConfigs: Map[String, String],
+ expectedDowngradeConfigs: Map[String, String]): Unit = {
val useOpAsDelete = useOpAsDeleteStr.equals("true")
val deleteOpts: Map[String, String] = if (useOpAsDelete) {
Map(DefaultHoodieRecordPayload.DELETE_KEY -> "Op", DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
@@ -223,6 +279,15 @@
val opts: Map[String, String] = Map(
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
+
+ // Common table service configurations
+ val serviceOpts: Map[String, String] = Map(
+ HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
+ HoodieClusteringConfig.PLAN_STRATEGY_SMALL_FILE_LIMIT.key() -> "512000",
+ HoodieClusteringConfig.PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key() -> "512000"
+ )
+
val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq",
DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME)
// 1. Add an insert.
@@ -231,27 +296,37 @@
(10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1, "i"),
(10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
(10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
- (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"),
- (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+ (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
val inserts = spark.createDataFrame(data).toDF(columns: _*)
- val originalOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
- "_event_seq"
+ val originalOrderingFields: Option[String] = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+ Some("_event_seq")
+ } else if (payloadClazz.equals(classOf[OverwriteWithLatestAvroPayload].getName)
+ || payloadClazz.equals(classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName)
+ || payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+ None // Don't set ordering fields for commit-time ordering payloads
} else {
- "ts"
+ Some("ts")
}
val expectedOrderingFields = if (payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
"_event_bin_file,_event_pos"
} else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
"_event_lsn"
} else {
- originalOrderingFields
+ originalOrderingFields.orNull
}
- inserts.write.format("hudi").
- option(RECORDKEY_FIELD.key(), "_event_lsn").
- option(ORDERING_FIELDS.key(), originalOrderingFields).
- option(TABLE_TYPE.key(), tableType).
+ val insertWriter = inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "_event_lsn")
+ val writerWithOrdering = originalOrderingFields match {
+ case Some(fields) => insertWriter.option(ORDERING_FIELDS.key(), fields)
+ case None => insertWriter
+ }
+ writerWithOrdering.option(TABLE_TYPE.key(), tableType).
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), "2048").
+ option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1024").
+ options(serviceOpts).
options(opts).
mode(SaveMode.Overwrite).
save(basePath)
@@ -278,21 +353,38 @@
(11, 1L, "rider-X", "driver-X", 19.10, "i", "11.1", 11, 1, "i"),
(12, 1L, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
(11, 2L, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"))
- val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
- firstUpdate.write.format("hudi").
- option(OPERATION.key(), "upsert").
- option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
- mode(SaveMode.Append).
- save(basePath)
+ performUpsert(firstUpdateData, columns, serviceOpts, Map.empty, basePath)
// Validate table version.
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
// validate ordering fields
- assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(""))
+ assertEquals(expectedOrderingFields, metaClient.getTableConfig.getOrderingFieldsStr.orElse(null))
val firstUpdateInstantTime = metaClient.getActiveTimeline.getInstants.get(1).requestedTime()
+ // 3. Add mixed ordering test data to validate proper ordering handling
+ // This tests that updates/deletes with lower ordering values are ignored
+ // while higher ordering values are applied
+ val mixedOrderingData = Seq(
+ // Update rider-B with LOWER ordering - EVENT_TIME: IGNORED, COMMIT_TIME: APPLIED
+ (8, 2L, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+ // Update rider-C with HIGHER ordering - should be APPLIED
+ (11, 3L, "rider-CC", "driver-CC", 35.00, "u", "15.1", 15, 1, "u"),
+ // Update rider-C with LOWER ordering - should be IGNORED (rider-C has ts=10 originally)
+ (8, 3L, "rider-CC", "driver-CC", 30.00, "u", "8.1", 8, 1, "u"),
+ // Delete rider-E with LOWER ordering - should be IGNORED (rider-E has ts=10 originally)
+ (9, 5L, "rider-EE", "driver-EE", 17.85, "D", "9.1", 9, 1, "d"))
+ performUpsert(mixedOrderingData, columns, serviceOpts, opts, basePath)
+ // Validate table version is still 9 after mixed ordering batch
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
- // 3. Add an update. This is expected to trigger the upgrade
+ // 4. Add an update. This is expected to trigger the upgrade
val compactionEnabled = if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
"true"
} else {
@@ -304,22 +396,17 @@
// so that the test will fail if _event_seq is still used for ordering
(9, 4L, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
(12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
- val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
- secondUpdate.write.format("hudi").
- option(OPERATION.key(), "upsert").
- option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled).
- option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
- mode(SaveMode.Append).
- save(basePath)
+ performUpsert(secondUpdateData, columns, serviceOpts, Map.empty, basePath,
+ compactionEnabled = compactionEnabled)
// Validate table version as 9.
- metaClient = HoodieTableMetaClient.reload(metaClient)
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
assertEquals(9, metaClient.getTableConfig.getTableVersion.versionCode())
assertEquals(payloadClazz, metaClient.getTableConfig.getLegacyPayloadClass)
- val compactionInstants = metaClient.getActiveTimeline.getCommitsAndCompactionTimeline.getInstants
- val foundCompaction = compactionInstants.stream().anyMatch(i => i.getAction.equals("commit"))
- assertTrue(foundCompaction)
- // 4. Add a trivial update to trigger payload class mismatch.
+ // 5. Add a trivial update to trigger payload class mismatch.
val thirdUpdateData = Seq(
(12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"))
val thirdUpdate = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
@@ -336,7 +423,7 @@
}
}
- // 5. Add a delete.
+ // 6. Add a delete.
val fourthUpdateData = Seq(
(12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
(12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))
@@ -345,10 +432,28 @@
option(OPERATION.key(), "delete").
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
+ options(serviceOpts).
mode(SaveMode.Append).
save(basePath)
- // 6. Validate.
+ // 7. Add INSERT operation.
+ val insertData = Seq(
+ (13, 6L, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7L, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+ performInsert(insertData, columns, serviceOpts, basePath)
+
+ // Final validation of table management operations after all writes
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+ validateTableManagementOps(metaClient, tableType,
+ expectCompaction = tableType.equals(HoodieTableType.MERGE_ON_READ.name()),
+ expectClustering = true, // Enable clustering validation with lowered thresholds
+ expectCleaning = false,
+ expectArchival = false)
+
+ // 8. Validate.
// Validate table configs again.
tableConfig = metaClient.getTableConfig
expectedConfigs.foreach { case (key, expectedValue) =>
@@ -376,6 +481,9 @@
assertTrue(
expectedTimeTravelDf.except(timeTravelDf).isEmpty
&& timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+
+ // 9-10. Perform downgrade and validation
+ performDowngradeAndValidate(metaClient, basePath, expectedDowngradeConfigs, expectedData, columns, serviceOpts)
}
def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
@@ -386,59 +494,294 @@
.build()
}
+ /**
+ * Helper method to perform upsert operations with configurable options
+ */
+ def performUpsert(data: Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)],
+ columns: Seq[String],
+ serviceOpts: Map[String, String],
+ opts: Map[String, String],
+ basePath: String,
+ tableVersion: Option[String] = None,
+ orderingFields: Option[String] = None,
+ compactionEnabled: String = "false",
+ compactionNumDeltaCommits: String = "1"): Unit = {
+ val dataFrame = spark.createDataFrame(data).toDF(columns: _*)
+ val writer = dataFrame.write.format("hudi")
+ .option(OPERATION.key(), "upsert")
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key(), compactionEnabled)
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), compactionNumDeltaCommits)
+ .options(serviceOpts)
+
+ val writerWithVersion = tableVersion match {
+ case Some(version) => writer.option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), version)
+ case None => writer
+ }
+
+ val writerWithOrdering = orderingFields match {
+ case Some(fields) => writerWithVersion.option(HoodieTableConfig.ORDERING_FIELDS.key(), fields)
+ case None => writerWithVersion
+ }
+
+ writerWithOrdering.options(opts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
+ /**
+ * Helper method to perform insert operations
+ */
+ def performInsert(data: Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)],
+ columns: Seq[String],
+ serviceOpts: Map[String, String],
+ basePath: String): Unit = {
+ val dataFrame = spark.createDataFrame(data).toDF(columns: _*)
+ dataFrame.write.format("hudi")
+ .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false")
+ .options(serviceOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
+ /**
+ * Helper method to perform downgrade from v9 to v8 and validate the results
+ */
+ def performDowngradeAndValidate(metaClient: HoodieTableMetaClient,
+ basePath: String,
+ expectedDowngradeConfigs: Map[String, String],
+ expectedData: Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)],
+ columns: Seq[String],
+ serviceOpts: Map[String, String]): Unit = {
+ // 9. Downgrade from v9 to v8
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(spark.read.format("hudi").load(basePath).schema.json)
+ .build()
+
+ new UpgradeDowngrade(metaClient, writeConfig, context, SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.EIGHT, null)
+
+ // Reload metaClient to get updated table config
+ val downgradedMetaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storageConf())
+ .build()
+
+ // Validate table version is 8
+ assertEquals(8, downgradedMetaClient.getTableConfig.getTableVersion.versionCode())
+
+ // Validate downgrade configs
+ val downgradedTableConfig = downgradedMetaClient.getTableConfig
+ expectedDowngradeConfigs.foreach { case (key, expectedValue) =>
+ if (expectedValue != null) {
+ assertEquals(expectedValue, downgradedTableConfig.getString(key), s"Config $key should be $expectedValue after downgrade")
+ } else {
+ assertFalse(downgradedTableConfig.contains(key), s"Config $key should not be present after downgrade")
+ }
+ }
+
+ // 10. Add post-downgrade upsert to verify table functionality
+ performUpsert(
+ Seq((14, 10L, "rider-Z", "driver-Z", 45.50, "i", "14.1", 14, 1, "i")),
+ columns, serviceOpts, Map.empty, basePath)
+
+ // Validate data consistency after downgrade including new row
+ val downgradeDf = spark.read.format("hudi").load(basePath)
+ val downgradeFinalDf = downgradeDf.select("ts", "_event_lsn", "rider", "driver", "fare", "Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME).sort("_event_lsn")
+
+ // Create expected data including the new post-downgrade row
+ val expectedDataWithNewRow = expectedData ++ Seq((14, 10L, "rider-Z", "driver-Z", 45.50, "i", "14.1", 14, 1, "i"))
+ val expectedDfWithNewRow = spark.createDataFrame(spark.sparkContext.parallelize(expectedDataWithNewRow)).toDF(columns: _*).sort("_event_lsn")
+ assertTrue(expectedDfWithNewRow.except(downgradeFinalDf).isEmpty && downgradeFinalDf.except(expectedDfWithNewRow).isEmpty,
+ "Data should remain consistent after downgrade including new row")
+ }
+
def getExpectedResultForSnapshotQuery(payloadClazz: String, usesDeleteMarker: Boolean): Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)] = {
if (!isCDCPayload(payloadClazz) && !usesDeleteMarker) {
if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
|| payloadClazz.equals(classOf[EventTimeAvroPayload].getName)
|| payloadClazz.equals(classOf[DefaultHoodieRecordPayload].getName))
{
+ // Expected results after all operations with _event_lsn collisions:
+ // - rider-X (_event_lsn=1): deleted with higher ordering (ts=12)
+ // - rider-Y (_event_lsn=2): updated with higher ordering (ts=11), lower ordering rider-BB rejected (ts=8 < 11)
+ // - _event_lsn=3: DELETED by delete operation (was rider-CC with ts=12)
+ // - _event_lsn=4: rider-D stays with original data (rider-DD ts=9 < rider-D ts=10)
+ // - _event_lsn=5: DELETED by delete operation (was rider-EE with ts=12)
Seq(
(12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
(10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
- (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+ (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
} else {
+ // For other payload types (OverwriteWithLatestAvroPayload, OverwriteNonDefaultsWithLatestAvroPayload)
+ // These use COMMIT_TIME_ORDERING, so latest write wins regardless of ts value
+ // _event_lsn=2: rider-BB overwrites rider-Y (latest commit wins even though ts=8 < ts=11)
+ // _event_lsn=3: rider-CC overwrites (latest commit), then deleted
+ // _event_lsn=4: rider-DD overwrites (latest commit)
+ // _event_lsn=5: rider-EE overwrites (latest commit), then deleted
Seq(
(12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
- (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
+ (8, 2, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
(9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
- (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+ (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
}
} else {
+ // For CDC payloads or when delete markers are used
if (payloadClazz.equals(classOf[DefaultHoodieRecordPayload].getName)) {
+ // Delete markers remove records completely
+ // Note: rider-B keeps original values because rider-BB update (ts=8) was rejected (8 < 11) for EVENT_TIME
+ // Note: rider-D keeps original values because rider-DD update (ts=9) was rejected (9 < 10)
+ // _event_lsn=1 (rider-X) and _event_lsn=5 (rider-EE) are deleted by delete markers
Seq(
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
- (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"))
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
+ (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+ } else if (payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+ // AWSDmsAvroPayload uses COMMIT_TIME_ORDERING - latest commit wins regardless of ts value
+ // Mixed batch: rider-BB applies (latest commit wins even though ts=8 < ts=11), rider-CC update applies (latest commit)
+ // Second update: rider-DD applies (latest commit wins over rider-D)
+ // Final: _event_lsn=2 has rider-BB, _event_lsn=3 and _event_lsn=5 deleted, _event_lsn=4 has rider-DD
+ Seq(
+ (8, 2, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
+ (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
+ } else if (payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
+ // PostgresDebeziumAvroPayload uses EVENT_TIME_ORDERING with _event_lsn field
+ // But second update applies rider-DD due to later commit time (behaves like commit-time ordering)
+ // rider-BB behaves like commit-time ordering too (latest commit wins)
+ Seq(
+ (8, 2, "rider-BB", "driver-BB", 25.00, "u", "8.1", 8, 1, "u"),
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
+ (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
} else {
+ // For MySqlDebeziumAvroPayload
+ // Uses EVENT_TIME_ORDERING with _event_seq in v8, then _event_bin_file,_event_pos in v9
+ // Mixed ordering batch (v8): rider-BB has lower _event_seq (8.1 < 11.1) so REJECTED, rider-Y stays
+ // Second update (v9): rider-DD has higher _event_bin_file (12 > 10) so APPLIED
Seq(
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
- (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"))
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1", 12, 1, "i"),
+ (13, 6, "rider-G", "driver-G", 25.50, "i", "13.1", 13, 1, "i"),
+ (13, 7, "rider-H", "driver-H", 30.25, "i", "13.1", 13, 1, "i"))
}
}
}
def getExpectedResultForTimeTravelQuery(payloadClazz: String, usesDeleteMarker: Boolean):
Seq[(Int, Long, String, String, Double, String, String, Int, Int, String)] = {
+ // Time travel query shows state after first update but BEFORE mixed ordering batch
+ // So rider-C, rider-D, rider-E should still have their original values
if (!isCDCPayload(payloadClazz) && !usesDeleteMarker) {
Seq(
(12, 1, "rider-X", "driver-X", 20.10, "D", "12.1", 12, 1, "d"),
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
- (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
- (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
- (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"),
- (10, 6L, "rider-F", "driver-F", 17.38, "D", "10.1", 10, 1, "d"))
+ (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"), // Original rider-C before mixed ordering
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"), // Original rider-D before mixed ordering
+ (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i")) // Original rider-E before mixed ordering
} else {
+ // For CDC payloads or when delete markers are used
Seq(
(11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1", 11, 1, "u"),
- (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
- (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
- (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
+ (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"), // Original rider-C before mixed ordering
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"), // Original rider-D before mixed ordering
+ (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i")) // Original rider-E before mixed ordering
}
}
private def isCDCPayload(payloadClazz: String) = {
payloadClazz.equals(classOf[AWSDmsAvroPayload].getName) || payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName) || payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)
}
+
+ /**
+ * Helper method to validate that compaction occurred for MOR tables
+ */
+ def validateCompaction(metaClient: HoodieTableMetaClient, tableType: String): Unit = {
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ metaClient.reloadActiveTimeline()
+ val compactionInstants = metaClient.getActiveTimeline
+ .getCommitsAndCompactionTimeline
+ .getInstants
+ .asScala
+ .filter(_.getAction.equals("commit"))
+ assertTrue(compactionInstants.nonEmpty,
+ s"Compaction should have occurred for MOR table but found no commit instants")
+ }
+ }
+
+ /**
+ * Helper method to validate that clustering occurred
+ */
+ def validateClustering(metaClient: HoodieTableMetaClient): Unit = {
+ metaClient.reloadActiveTimeline()
+ val clusteringInstants = metaClient.getActiveTimeline
+ .getInstants
+ .asScala
+ .filter(_.getAction.equals("replacecommit")) // check if this is correct
+ assertTrue(clusteringInstants.nonEmpty,
+ s"Clustering should have occurred but found no replacecommit instants")
+ }
+
+ /**
+ * Helper method to validate that cleaning occurred
+ */
+ def validateCleaning(metaClient: HoodieTableMetaClient): Unit = {
+ metaClient.reloadActiveTimeline()
+ val cleanInstants = metaClient.getActiveTimeline
+ .getCleanerTimeline
+ .getInstants
+ .asScala
+ assertTrue(cleanInstants.nonEmpty,
+ s"Cleaning should have occurred but found no clean instants")
+ }
+
+ /**
+ * Helper method to validate that archival occurred
+ */
+ def validateArchival(metaClient: HoodieTableMetaClient): Unit = {
+ val archivedTimeline = metaClient.getArchivedTimeline
+ val archivedInstants = archivedTimeline.getInstants.asScala
+ assertTrue(archivedInstants.nonEmpty,
+ s"Archival should have occurred but found no archived instants")
+ }
+
+ /**
+ * Helper method to validate all table management operations
+ */
+ def validateTableManagementOps(metaClient: HoodieTableMetaClient,
+ tableType: String,
+ expectCompaction: Boolean = true,
+ expectClustering: Boolean = true,
+ expectCleaning: Boolean = true,
+ expectArchival: Boolean = true): Unit = {
+ metaClient.reloadActiveTimeline()
+
+ // Validate compaction for MOR tables
+ if (expectCompaction) {
+ validateCompaction(metaClient, tableType)
+ }
+
+ // Validate clustering
+ if (expectClustering) {
+ validateClustering(metaClient)
+ }
+
+ // Validate cleaning
+ if (expectCleaning) {
+ validateCleaning(metaClient)
+ }
+
+ // Validate archival
+ if (expectArchival) {
+ validateArchival(metaClient)
+ }
+ }
}
object TestPayloadDeprecationFlow {
@@ -450,7 +793,11 @@
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -458,7 +805,13 @@
"true",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
),
Arguments.of(
"COPY_ON_WRITE",
@@ -466,7 +819,11 @@
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -475,7 +832,13 @@
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -486,7 +849,16 @@
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
- -> "__debezium_unavailable_value")
+ -> "__debezium_unavailable_value"),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_lsn",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+ HoodieTableConfig.ORDERING_FIELDS.key() -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE -> null)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -495,8 +867,14 @@
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
- HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME))
- ),
+ HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME)),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_seq",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.ORDERING_FIELDS.key() -> null)),
Arguments.of(
"COPY_ON_WRITE",
classOf[AWSDmsAvroPayload].getName,
@@ -505,7 +883,14 @@
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> null)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -513,7 +898,11 @@
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"COPY_ON_WRITE",
@@ -522,7 +911,14 @@
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
+ ),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -530,7 +926,11 @@
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -538,7 +938,13 @@
"true",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
),
Arguments.of(
"MERGE_ON_READ",
@@ -546,7 +952,11 @@
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -555,7 +965,13 @@
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PartialUpdateAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -566,7 +982,16 @@
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "FILL_UNAVAILABLE",
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE
- -> "__debezium_unavailable_value")
+ -> "__debezium_unavailable_value"),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[PostgresDebeziumAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_lsn",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null,
+ HoodieTableConfig.ORDERING_FIELDS.key() -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + HoodieTableConfig.PARTIAL_UPDATE_UNAVAILABLE_VALUE -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -575,8 +1000,14 @@
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
- HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME))
- ),
+ HoodieTableConfig.ORDERING_FIELDS.key() -> (DebeziumConstants.FLATTENED_FILE_COL_NAME + "," + DebeziumConstants.FLATTENED_POS_COL_NAME)),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[MySqlDebeziumAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.PRECOMBINE_FIELD.key() -> "_event_seq",
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.ORDERING_FIELDS.key() -> null)),
Arguments.of(
"MERGE_ON_READ",
classOf[AWSDmsAvroPayload].getName,
@@ -585,7 +1016,14 @@
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> "Op",
- HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> "D"),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[AWSDmsAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_KEY -> null,
+ HoodieTableConfig.RECORD_MERGE_PROPERTY_PREFIX + DefaultHoodieRecordPayload.DELETE_MARKER -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -593,7 +1031,11 @@
"false",
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "EVENT_TIME_ORDERING",
- HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName)
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[EventTimeAvroPayload].getName,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
),
Arguments.of(
"MERGE_ON_READ",
@@ -602,7 +1044,14 @@
Map(
HoodieTableConfig.RECORD_MERGE_MODE.key() -> "COMMIT_TIME_ORDERING",
HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
- HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS")
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> "IGNORE_DEFAULTS"
+ ),
+ Map(
+ HoodieTableConfig.PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName,
+ HoodieTableConfig.RECORD_MERGE_MODE.key() -> "CUSTOM",
+ HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+ HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME.key() -> null,
+ HoodieTableConfig.PARTIAL_UPDATE_MODE.key() -> null)
)
)
}