[GRIFFIN-358] Update Duplication Measure to exclude null values
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index b1eaad3..12447dd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -55,18 +55,22 @@
override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
val input = sparkSession.read.table(measureParam.getDataSource)
-
- val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
- val uniqueCol = when(col(Unique) =!= 1, 0).otherwise(1)
- val distinctCol = when(col(Unique) === 1 or col(NonUnique) === 1, 1).otherwise(0)
- val nonUniqueCol =
- when(col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1).otherwise(0)
-
val cols = keyCols(input).map(col)
+
+ val isNullCol = cols.map(x => x.isNull).reduce(_ and _)
+ val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
+ val uniqueCol = when(not(isNullCol) and col(Unique) === 1, 1).otherwise(0)
+ val distinctCol =
+ when(not(isNullCol) and (col(Unique) === 1 or col(NonUnique) === 1), 1).otherwise(0)
+ val nonUniqueCol =
+ when(not(isNullCol) and col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1)
+ .otherwise(0)
+
val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val aggDf = input
.select(col(AllColumns), row_number().over(window).as(__Temp))
+ .withColumn(IsNull, isNullCol)
.withColumn(Duplicate, duplicateCol)
.withColumn(Unique, count(lit(1)).over(window))
.withColumn(Unique, uniqueCol)
@@ -74,7 +78,7 @@
.withColumn(NonUnique, nonUniqueCol)
.withColumn(Distinct, distinctCol)
.withColumn(valueColumn, col(badnessExpr))
- .drop(__Temp)
+ .drop(__Temp, IsNull)
val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
@@ -114,6 +118,7 @@
}
object DuplicationMeasure {
+ final val IsNull: String = "is_null"
final val Duplicate: String = "duplicate"
final val Unique: String = "unique"
final val NonUnique: String = "non_unique"
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index d50b202..984688d 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -140,9 +140,9 @@
val metricMap = row.getAs[Map[String, String]](Metrics)
assertResult(metricMap(Duplicate))("1")
- assertResult(metricMap(Unique))("3")
+ assertResult(metricMap(Unique))("2")
assertResult(metricMap(NonUnique))("1")
- assertResult(metricMap(Distinct))("4")
+ assertResult(metricMap(Distinct))("3")
}
}