[HUDI-5482] Nulls should be counted in the value count stats for mor table (#7482)
* The behavior is kept in line with COW parquet file stats.
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 374d6fb..1d7481d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -159,6 +159,8 @@
final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), false);
final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
+ colStats.valueCount++;
+
if (fieldVal != null && canCompare(fieldSchema)) {
// Set the min value of the field
if (colStats.minValue == null
@@ -170,8 +172,6 @@
if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) {
colStats.maxValue = fieldVal;
}
-
- colStats.valueCount++;
} else {
colStats.nullCount++;
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 3bf8d35..1172b65 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -38,7 +38,7 @@
import org.junit.jupiter.api._
import org.junit.jupiter.api.condition.DisabledIf
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -130,6 +130,54 @@
}
@ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit = {
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+
+ val schema = StructType(StructField("c1", IntegerType, false) :: StructField("c2", StringType, true) :: Nil)
+ val inputDF = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, "v1"), Row(2, "v2"), Row(3, null), Row(4, "v4"))),
+ schema)
+
+ inputDF
+ .sort("c1", "c2")
+ .write
+ .format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(metadataOpts))
+ .build()
+
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
+ columnStatsIndex.loadTransposed(Seq("c2"), false) { transposedDF =>
+ val result = transposedDF.select("valueCount", "c2_nullCount")
+ .collect().head
+
+ assertTrue(result.getLong(0) == 4)
+ assertTrue(result.getLong(1) == 1)
+ }
+ }
+
+ @ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")