[GRIFFIN-358] Update Duplication Measure to exclude null values
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index b1eaad3..12447dd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -55,18 +55,22 @@
 
   override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
     val input = sparkSession.read.table(measureParam.getDataSource)
-
-    val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
-    val uniqueCol = when(col(Unique) =!= 1, 0).otherwise(1)
-    val distinctCol = when(col(Unique) === 1 or col(NonUnique) === 1, 1).otherwise(0)
-    val nonUniqueCol =
-      when(col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1).otherwise(0)
-
     val cols = keyCols(input).map(col)
+
+    val isNullCol = cols.map(x => x.isNull).reduce(_ and _)
+    val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
+    val uniqueCol = when(not(isNullCol) and col(Unique) === 1, 1).otherwise(0)
+    val distinctCol =
+      when(not(isNullCol) and (col(Unique) === 1 or col(NonUnique) === 1), 1).otherwise(0)
+    val nonUniqueCol =
+      when(not(isNullCol) and col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1)
+        .otherwise(0)
+
     val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
 
     val aggDf = input
       .select(col(AllColumns), row_number().over(window).as(__Temp))
+      .withColumn(IsNull, isNullCol)
       .withColumn(Duplicate, duplicateCol)
       .withColumn(Unique, count(lit(1)).over(window))
       .withColumn(Unique, uniqueCol)
@@ -74,7 +78,7 @@
       .withColumn(NonUnique, nonUniqueCol)
       .withColumn(Distinct, distinctCol)
       .withColumn(valueColumn, col(badnessExpr))
-      .drop(__Temp)
+      .drop(__Temp, IsNull)
 
     val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
     val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
@@ -114,6 +118,7 @@
 }
 
 object DuplicationMeasure {
+  final val IsNull: String = "is_null"
   final val Duplicate: String = "duplicate"
   final val Unique: String = "unique"
   final val NonUnique: String = "non_unique"
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index d50b202..984688d 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -140,9 +140,9 @@
 
     val metricMap = row.getAs[Map[String, String]](Metrics)
     assertResult(metricMap(Duplicate))("1")
-    assertResult(metricMap(Unique))("3")
+    assertResult(metricMap(Unique))("2")
     assertResult(metricMap(NonUnique))("1")
-    assertResult(metricMap(Distinct))("4")
+    assertResult(metricMap(Distinct))("3")
   }
 
 }