[GRIFFIN-358] New Duplication (Distinctness, Uniqueness) Measure
diff --git a/measure/src/main/resources/config-batch-preproc.json b/measure/src/main/resources/config-batch-preproc.json
index 405f1fb..60f6900 100644
--- a/measure/src/main/resources/config-batch-preproc.json
+++ b/measure/src/main/resources/config-batch-preproc.json
@@ -18,6 +18,23 @@
           "select cast(part[0] as long) as user_id, part[1] as first_name, part[2] as last_name, part[3] as address, part[4] as email, part[5] as phone, part[6] as post_code from this"
         ]
       }
+    },
+    {
+      "name": "source_2",
+      "baseline": true,
+      "connector": {
+        "type": "file",
+        "config": {
+          "format": "text",
+          "paths": [
+            "measure/src/test/resources/duplicates_users_info_src.csv"
+          ]
+        },
+        "pre.proc": [
+          "select split(value, ',') as part from this",
+          "select cast(part[0] as int) as id, part[1] as name, part[2] as gender from this"
+        ]
+      }
     }
   ],
   "measures": [
@@ -76,6 +93,26 @@
           "name": "sql_records"
         }
       ]
+    },
+    {
+      "name": "duplication_measure",
+      "type": "duplication",
+      "data.source": "source_2",
+      "config": {
+        "expr": "name",
+        "bad.record.definition": "duplicate"
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "duplication_metric",
+          "flatten": "default"
+        },
+        {
+          "type": "record",
+          "name": "duplication_records"
+        }
+      ]
     }
   ],
   "sinks": [
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index 331b242..8dd1948 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -10,7 +10,7 @@
       "name": "consoleSink",
       "type": "CONSOLE",
       "config": {
-        "numRows": 40
+        "numRows": 20
       }
     },
     {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
index 8d73e1b..a6bba55 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
@@ -25,7 +25,7 @@
 
   type MeasureType = Value
 
-  val Completeness, Profiling, SparkSQL = Value
+  val Completeness, Duplication, Profiling, SparkSQL = Value
 
   override def withNameWithDefault(name: String): MeasureType = {
     super.withNameWithDefault(name)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index e1f10e8..c38752a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -80,6 +80,7 @@
   private def createMeasure(measureParam: MeasureParam): Measure = {
     measureParam.getType match {
       case MeasureTypes.Completeness => CompletenessMeasure(measureParam)
+      case MeasureTypes.Duplication => DuplicationMeasure(measureParam)
       case MeasureTypes.Profiling => ProfilingMeasure(measureParam)
       case MeasureTypes.SparkSQL => SparkSQLMeasure(measureParam)
       case _ =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index b1178f8..6352be4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -42,8 +42,8 @@
       case Some(exprStr) => when(expr(exprStr), 1.0).otherwise(0.0)
       case None =>
         error(
-          s"$Expression was not defined for completeness measures",
-          new IllegalArgumentException(s"$Expression was not defined for completeness measures"))
+          s"$Expression was not defined for completeness measure.",
+          new IllegalArgumentException(s"$Expression was not defined for completeness measure."))
         throw new IllegalArgumentException(
           s"$Expression was not defined for completeness measures")
     }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
new file mode 100644
index 0000000..971a2fd
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import io.netty.util.internal.StringUtil
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.expressions.Window
+import org.apache.spark.sql.functions._
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+/**
+ * Duplication Measure
+ *
+ * Definition of duplication measures used:
+ *  - Duplicate: the number of values that are the same as other values in the list
+ *  - Distinct: the number of non-null values that are different from each other (Non-unique + Unique)
+ *  - Non Unique: the number of values that have at least one duplicate in the list
+ *  - Unique: the number of values that have no duplicates
+ *
+ * @param measureParam Measure Param
+ */
+case class DuplicationMeasure(measureParam: MeasureParam) extends Measure {
+
+  import Measure._
+
+  private final val BadRecordDefinition = "bad.record.definition"
+
+  private final val Duplicate: String = "duplicate"
+  private final val Unique: String = "unique"
+  private final val NonUnique: String = "non_unique"
+  private final val Distinct: String = "distinct"
+  private final val __Temp: String = "__temp"
+
+  private final val duplicationMeasures = Seq(Duplicate, Unique, NonUnique, Distinct)
+
+  val exprs: String = getFromConfig[String](Expression, null)
+  private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
+
+  validate()
+
+  override val supportsRecordWrite: Boolean = true
+
+  override val supportsMetricWrite: Boolean = true
+
+  override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
+    val input = sparkSession.read.table(measureParam.getDataSource)
+
+    val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
+    val uniqueCol = when(col(Unique) =!= 1, 0).otherwise(1)
+    val distinctCol = when(col(Unique) === 1 or col(NonUnique) === 1, 1).otherwise(0)
+    val nonUniqueCol =
+      when(col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1).otherwise(0)
+
+    val keyCols = {
+      if (StringUtil.isNullOrEmpty(exprs)) input.columns
+      else exprs.split(",").map(_.trim)
+    }.distinct
+
+    keyCols.foreach(c =>
+      assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
+
+    val window = Window.partitionBy(keyCols.map(col): _*).orderBy(ConstantColumns.tmst)
+
+    val aggDf = input
+      .select(col("*"), row_number().over(window).as(__Temp))
+      .withColumn(Duplicate, duplicateCol)
+      .withColumn(Unique, count(lit(1)).over(window))
+      .withColumn(Unique, uniqueCol)
+      .withColumn(NonUnique, min(__Temp).over(window))
+      .withColumn(NonUnique, nonUniqueCol)
+      .withColumn(Distinct, distinctCol)
+      .withColumn(valueColumn, col(badnessExpr))
+      .drop(__Temp)
+
+    val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
+    val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e)))
+    val metricColumn = map(selectCols: _*).as(valueColumn)
+    val metricDf = aggDf
+      .agg(metricAggCols.head, metricAggCols.tail: _*)
+      .select(metricColumn)
+
+    val badRecordsDf = aggDf.drop(duplicationMeasures: _*)
+
+    (badRecordsDf, metricDf)
+  }
+
+  private def validate(): Unit = {
+    assert(
+      !StringUtil.isNullOrEmpty(badnessExpr),
+      s"Invalid value '$badnessExpr' provided for $BadRecordDefinition")
+
+    assert(badnessExpr match {
+      case Duplicate | Unique | NonUnique | Distinct => true
+      case _ => false
+    }, s"Invalid value '$badnessExpr' was provided for $BadRecordDefinition")
+  }
+}
diff --git a/measure/src/test/resources/duplicates_users_info_src.csv b/measure/src/test/resources/duplicates_users_info_src.csv
new file mode 100644
index 0000000..1dd54ff
--- /dev/null
+++ b/measure/src/test/resources/duplicates_users_info_src.csv
@@ -0,0 +1,5 @@
+1,John Smith,Male
+2,John Smith,Male
+3,Rebecca Davis,Female
+4,Paul Adams,Male
+