[GRIFFIN-358] Added DuplicationMeasureTest
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index fc6e10f..b1eaad3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -25,7 +25,6 @@
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
-import org.apache.griffin.measure.step.builder.ConstantColumns
 
 /**
  * Duplication Measure
@@ -40,14 +39,9 @@
  */
 case class DuplicationMeasure(measureParam: MeasureParam) extends Measure {
 
+  import DuplicationMeasure._
   import Measure._
 
-  private final val Duplicate: String = "duplicate"
-  private final val Unique: String = "unique"
-  private final val NonUnique: String = "non_unique"
-  private final val Distinct: String = "distinct"
-  private final val __Temp: String = "__temp"
-
   private final val duplicationMeasures = Seq(Duplicate, Unique, NonUnique, Distinct)
 
   val exprs: String = getFromConfig[String](Expression, null)
@@ -68,18 +62,11 @@
     val nonUniqueCol =
       when(col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1).otherwise(0)
 
-    val keyCols = {
-      if (StringUtil.isNullOrEmpty(exprs)) input.columns
-      else exprs.split(",").map(_.trim)
-    }.distinct
-
-    keyCols.foreach(c =>
-      assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
-
-    val window = Window.partitionBy(keyCols.map(col): _*).orderBy(ConstantColumns.tmst)
+    val cols = keyCols(input).map(col)
+    val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
 
     val aggDf = input
-      .select(col("*"), row_number().over(window).as(__Temp))
+      .select(col(AllColumns), row_number().over(window).as(__Temp))
       .withColumn(Duplicate, duplicateCol)
       .withColumn(Unique, count(lit(1)).over(window))
       .withColumn(Unique, uniqueCol)
@@ -90,7 +77,7 @@
       .drop(__Temp)
 
     val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
-    val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e)))
+    val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
     val metricColumn = map(selectCols: _*).as(valueColumn)
     val metricDf = aggDf
       .agg(metricAggCols.head, metricAggCols.tail: _*)
@@ -102,6 +89,13 @@
   }
 
   private def validate(): Unit = {
+    val input = SparkSession.getDefaultSession.get.read.table(measureParam.getDataSource)
+    val kc = keyCols(input)
+
+    assert(kc.nonEmpty, s"Columns defined in '$Expression' is empty.")
+    kc.foreach(c =>
+      assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
+
     assert(
       !StringUtil.isNullOrEmpty(badnessExpr),
       s"Invalid value '$badnessExpr' provided for $BadRecordDefinition")
@@ -111,4 +105,18 @@
       case _ => false
     }, s"Invalid value '$badnessExpr' was provided for $BadRecordDefinition")
   }
+
+  private def keyCols(input: DataFrame): Array[String] = {
+    if (StringUtil.isNullOrEmpty(exprs)) input.columns
+    else exprs.split(",").map(_.trim)
+  }.distinct
+
+}
+
+object DuplicationMeasure {
+  final val Duplicate: String = "duplicate"
+  final val Unique: String = "unique"
+  final val NonUnique: String = "non_unique"
+  final val Distinct: String = "distinct"
+  final val __Temp: String = "__temp"
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
new file mode 100644
index 0000000..7fed3d9
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.DuplicationMeasure._
+
+class DuplicationMeasureTest extends MeasureTest {
+  var param: MeasureParam = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    param = MeasureParam(
+      "param",
+      "Duplication",
+      "source",
+      Map(Expression -> "name", BadRecordDefinition -> "duplicate"))
+  }
+
+  "DuplicationMeasure" should "validate expression config" in {
+    // Validations for Duplication Expr
+
+    // Empty
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map.empty[String, String]))
+    }
+
+    // Empty
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+    }
+
+    // Null
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> null)))
+    }
+
+    // Incorrect Type
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> 1234)))
+    }
+
+    // Validations for BadRecordDefinition
+
+    // Empty
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> "name")))
+    }
+
+    // Empty
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "")))
+    }
+
+    // Null
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> "name", BadRecordDefinition -> null)))
+    }
+
+    // Incorrect Type
+    assertThrows[AssertionError] {
+      DuplicationMeasure(param.copy(config = Map(Expression -> "name", BadRecordDefinition -> 435)))
+    }
+
+    // Incorrect Value
+    assertThrows[AssertionError] {
+      DuplicationMeasure(
+        param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "xyz")))
+    }
+  }
+
+  it should "support metric writing" in {
+    val measure = DuplicationMeasure(param)
+    assertResult(true)(measure.supportsMetricWrite)
+  }
+
+  it should "support record writing" in {
+    val measure = DuplicationMeasure(param)
+    assertResult(true)(measure.supportsRecordWrite)
+  }
+
+  it should "execute defined measure expr" in {
+    val measure = DuplicationMeasure(param.copy(config = Map(BadRecordDefinition -> "duplicate")))
+    val (recordsDf, metricsDf) = measure.execute(context, None)
+
+    assertResult(recordsDf.schema)(recordDfSchema)
+    assertResult(metricsDf.schema)(metricDfSchema)
+
+    assertResult(recordsDf.count())(source.count())
+    assertResult(metricsDf.count())(1L)
+
+    val row = metricsDf.head()
+    assertResult(param.getDataSource)(row.getAs[String](DataSource))
+    assertResult(param.getName)(row.getAs[String](MeasureName))
+    assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+    val metricMap = row.getAs[Map[String, String]](Metrics)
+    println(metricMap)
+    assertResult(metricMap(Duplicate))("0")
+    assertResult(metricMap(Unique))("5")
+    assertResult(metricMap(NonUnique))("0")
+    assertResult(metricMap(Distinct))("5")
+  }
+
+  it should "supported complex measure expr" in {
+    val measure = DuplicationMeasure(
+      param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "duplicate")))
+    val (recordsDf, metricsDf) = measure.execute(context, None)
+
+    assertResult(recordsDf.schema)(recordDfSchema)
+    assertResult(metricsDf.schema)(metricDfSchema)
+
+    assertResult(recordsDf.count())(source.count())
+    assertResult(metricsDf.count())(1L)
+
+    val row = metricsDf.head()
+    assertResult(param.getDataSource)(row.getAs[String](DataSource))
+    assertResult(param.getName)(row.getAs[String](MeasureName))
+    assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+    val metricMap = row.getAs[Map[String, String]](Metrics)
+    assertResult(metricMap(Duplicate))("1")
+    assertResult(metricMap(Unique))("3")
+    assertResult(metricMap(NonUnique))("1")
+    assertResult(metricMap(Distinct))("4")
+  }
+
+}