[GRIFFIN-358] Added code documentation for all new measures.
diff --git a/griffin-doc/measure/measure-configuration-guide/duplication.md b/griffin-doc/measure/measure-configuration-guide/duplication.md
index 1a2c5ec..b256d5d 100644
--- a/griffin-doc/measure/measure-configuration-guide/duplication.md
+++ b/griffin-doc/measure/measure-configuration-guide/duplication.md
@@ -180,6 +180,7 @@
       "data_source": "crime_report_source",
       "metrics": {
         "duplicate": "4363",
+        "total": "4617",
         "unique": "58",
         "non_unique": "196",
         "distinct": "254"
diff --git a/griffin-doc/measure/measure-configuration-guide/profiling.md b/griffin-doc/measure/measure-configuration-guide/profiling.md
index c3bf602..76ec9cc 100644
--- a/griffin-doc/measure/measure-configuration-guide/profiling.md
+++ b/griffin-doc/measure/measure-configuration-guide/profiling.md
@@ -49,7 +49,7 @@
 Data Profiling helps us create a huge amount of insight into the quality levels of our data and helps to find data
 quality rules and requirements that will support a more thorough data quality assessment in a later step. For example,
 data profiling can help us to discover value frequencies, formats and patterns for each attribute in the data asset.
-Using data profiling alone we can find some perceived defects and outliers in the data asset and we end up with a whole
+Using data profiling alone we can find some perceived defects and outliers in the data asset, and we end up with a whole
 range of clues based on which correct Quality assessment measures can be defined like completeness/ distinctness etc.
 
 ### Configuration
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index 1f49328..966109e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -122,11 +122,10 @@
    * @param dataSourceName name of data source
    * @param f function to perform
    * @param measureCountByDataSource number of measures for each data source
-   * @tparam T return type of function `f`
    * @return
    */
-  private def withCacheIfNecessary[T](dataSourceName: String, f: => T)(
-      implicit measureCountByDataSource: Map[String, Int]): T = {
+  private def withCacheIfNecessary(dataSourceName: String, f: => Unit)(
+      implicit measureCountByDataSource: Map[String, Int]): Unit = {
     val numMeasures = measureCountByDataSource(dataSourceName)
     var isCached = false
     if (cacheDataSources && numMeasures > 1) {
@@ -185,7 +184,7 @@
           info(s"Successfully executed measure with name '${task._1}' $batchDetailsOpt")
         case Failure(e) =>
           error(s"Error executing measure with name '${task._1}' $batchDetailsOpt", e)
-    })
+      })
 
     while (!tasks.forall(_._2.isCompleted)) {
       info(
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index 2c65ddd..44f5758 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -27,24 +27,78 @@
 import org.apache.griffin.measure.execution.Measure
 import org.apache.griffin.measure.step.builder.ConstantColumns
 
+/**
+ * Accuracy Measure.
+ *
+ * Data accuracy refers to the degree to which the values of a said attribute in a data source agree
+ * with an identified reference truth data (source of correct information).
+ * In-accurate data may come from different sources like,
+ *   - Dynamically computed values,
+ *   - the result of a manual workflow,
+ *   - irate customers, etc.
+ *
+ * Accuracy measure quantifies the extent to which data sets contains are correct, reliable and certified
+ * values that are free of error. Higher accuracy values signify that the said data set represents
+ * the "real-life" values/ objects that it intends to model.
+ *
+ * Accuracy measure is comparative in nature - attributes of data source to be checked are compared with
+ * attributes of another reference source. Thus, unlike other measures/ dimensions, Accuracy
+ * relies on definition of 2 sources,
+ *   - the reference (truth) source which contains the good/ correct/ accurate values.
+ *   - the actual data source to be assessed and measured for data accuracy.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
 case class AccuracyMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
+  /**
+   * Representation of a single accuracy expression object.
+   *
+   * @param sourceCol name of source column
+   * @param refCol name of reference column
+   */
   case class AccuracyExpr(sourceCol: String, refCol: String)
 
   import AccuracyMeasure._
   import Measure._
 
+  /**
+   * Accuracy measure supports record and metric write
+   */
   override val supportsRecordWrite: Boolean = true
 
   override val supportsMetricWrite: Boolean = true
 
-  val refSource: String = getFromConfig[String](ReferenceSourceStr, null)
+  /**
+   * The value for expr is a json array of comparison objects where each object has 2 fields -
+   * `source.col` and `ref.col` which must be actual columns in the source and reference data sets respectively.
+   * This key is mandatory and expr array must not be empty i.e. at least one comparison must be defined.
+   */
   val exprOpt: Option[Seq[Map[String, String]]] =
     Option(getFromConfig[Seq[Map[String, String]]](Expression, null))
 
+  /**
+   * This is a mandatory parameter which selects the data source which will be used as reference.
+   * This is a mandatory parameter and this data source must be defined in the sources section
+   * of the application configuration.
+   */
+  val refSource: String = getFromConfig[String](ReferenceSourceStr, null)
+
   validate()
 
+  /**
+   * Performs a measurement of common values as a join between the mapped columns of the reference and source
+   * data sets.
+   *
+   * Accuracy produces the following 3 metrics as result,
+   *  - Total records
+   *  - Accurate records
+   *  - In accurate records
+   *
+   *  @return tuple of records dataframe and metric dataframe
+   */
   override def impl(): (DataFrame, DataFrame) = {
     val originalSource = sparkSession.read.table(measureParam.getDataSource)
     val originalCols = originalSource.columns
@@ -91,6 +145,23 @@
     (recordsDf, metricDf)
   }
 
+  /**
+   * JSON representation of the  `expr` is deserialized as Map internally which is now converted to an
+   * `AccuracyExpr` representation for a fixed structure across all expression object(s).
+   *
+   * @param map map representation of the `expr`
+   * @return instance of `AccuracyExpr`
+   */
+  private def toAccuracyExpr(map: Map[String, String]): AccuracyExpr = {
+    assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
+    assert(map.contains(ReferenceColStr), s"'$ReferenceColStr' must be defined.")
+
+    AccuracyExpr(map(SourceColStr), map(ReferenceColStr))
+  }
+
+  /**
+   * Validates if the expression is not null and non empty along with some dataset specific validations.
+   */
   override def validate(): Unit = {
     assert(exprOpt.isDefined, s"'$Expression' must be defined.")
     assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
@@ -99,17 +170,6 @@
       !StringUtil.isNullOrEmpty(refSource),
       s"'$ReferenceSourceStr' must not be null, empty or of invalid type.")
 
-    datasetValidations()
-  }
-
-  private def toAccuracyExpr(map: Map[String, String]): AccuracyExpr = {
-    assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
-    assert(map.contains(ReferenceColStr), s"'$ReferenceColStr' must be defined.")
-
-    AccuracyExpr(map(SourceColStr), map(ReferenceColStr))
-  }
-
-  private def datasetValidations(): Unit = {
     assert(
       sparkSession.catalog.tableExists(refSource),
       s"Reference source with name '$refSource' does not exist.")
@@ -145,17 +205,35 @@
         s"do not exist in reference data set with name '$refSource'")
   }
 
+  /**
+   * Helper method to prepend a prefix to all column names to uniquely identify them.
+   * In case if they exist in both source and target data sets there is no collision.
+   *
+   * @param dataFrame data set
+   * @param prefix prefix to set
+   * @return
+   */
   private def addColumnPrefix(dataFrame: DataFrame, prefix: String): DataFrame = {
     val columns = dataFrame.columns
     columns.foldLeft(dataFrame)((df, c) => df.withColumnRenamed(c, s"$prefix$c"))
   }
 
+  /**
+   * Helper method to strip a prefix from all column names that previously helped in uniquely identify them.
+   *
+   * @param dataFrame data set
+   * @param prefix prefix to remove
+   * @return
+   */
   private def removeColumnPrefix(dataFrame: DataFrame, prefix: String): DataFrame = {
     val columns = dataFrame.columns
     columns.foldLeft(dataFrame)((df, c) => df.withColumnRenamed(c, c.stripPrefix(prefix)))
   }
 }
 
+/**
+ * Accuracy measure constants
+ */
 object AccuracyMeasure {
   final val SourcePrefixStr: String = "__source_"
   final val refPrefixStr: String = "__ref_"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index 70a4a89..f8ea81d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -24,22 +24,63 @@
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
 
+/**
+ * Completeness Measure.
+ *
+ * Completeness refers to the degree to which values are present in a data collection.
+ * When data is incomplete due to unavailability (missing records), this does not represent a lack of completeness.
+ * As far as an individual datum is concerned, only two situations are possible - either a value is assigned
+ * to the attribute in question or not. The latter case is usually represented by a `null` value.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
 case class CompletenessMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
   import Measure._
 
+  /**
+   * Completeness Constants
+   */
   final val Complete: String = "complete"
   final val InComplete: String = "incomplete"
 
+  /**
+   * Completeness measure supports record and metric write
+   */
   override val supportsRecordWrite: Boolean = true
 
   override val supportsMetricWrite: Boolean = true
 
+  /**
+   * The value for expr is a SQL-like expression string which definition this completeness.
+   * For more complex definitions, expressions can be clubbed with AND and OR.
+   *
+   * For a tabular data set with columns name, email and age, some examples of `expr` are mentioned below,
+   *   - name is NULL
+   *   - name is NULL and age is NULL
+   *   - email NOT RLIKE `'^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'` (without ` and `)
+   * Note: This expression describes the bad or incomplete records. This means that
+   * for "expr": "zipcode is NULL" the records which contain null in zipcode column are considered as incomplete.
+   */
   val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
 
   validate()
 
+  /**
+   * Completeness Measure.
+   *
+   * Completeness evaluates the user provided `expr` for each row of the input dataset.
+   * Each row that fails this expression is tagged as incomplete record(s), all other record(s) are complete.
+   *
+   * Completeness produces the following 3 metrics as result,
+   *  - Total records
+   *  - Complete records
+   *  - Incomplete records
+   *
+   *  @return tuple of records dataframe and metric dataframe
+   */
   override def impl(): (DataFrame, DataFrame) = {
     val exprStr = exprOpt.get
 
@@ -59,6 +100,9 @@
     (badRecordsDf, metricDf)
   }
 
+  /**
+   * Validates if expression is defined and is non empty.
+   */
   override def validate(): Unit = {
     assert(exprOpt.isDefined, s"'$Expression' must be defined.")
     assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index f8c5214..a325b90 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -27,15 +27,23 @@
 import org.apache.griffin.measure.execution.Measure
 
 /**
- * Duplication Measure
+ * Duplication Measure.
  *
- * Definition of duplication measures used:
+ * Asserting the measure of duplication of the entities within a data set implies that
+ * no entity exists more than once within the data set and that there is a key that can be used
+ * to uniquely access each entity.
+ *
+ * For example, in a master product table, each product must appear once and be assigned a unique
+ * identifier that represents that product within a system or across multiple applications/ systems.
+ *
+ * Duplication measures the redundancies in a dataset in terms of the following metrics,
  *  - Duplicate: the number of values that are the same as other values in the list
  *  - Distinct: the number of non-null values that are different from each other (Non-unique + Unique)
  *  - Non Unique: the number of values that have at least one duplicate in the list
  *  - Unique: the number of values that have no duplicates
  *
- * @param measureParam Measure Param
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
  */
 case class DuplicationMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
@@ -43,17 +51,49 @@
   import DuplicationMeasure._
   import Measure._
 
-  private final val duplicationMeasures = Seq(Duplicate, Unique, NonUnique, Distinct)
+  /**
+   * Metrics of redundancies
+   */
+  private final val duplicationMeasures = Seq(Total, Duplicate, Unique, NonUnique, Distinct)
 
+  /**
+   * The value for `expr` is a comma separated string of columns in the data asset on which the
+   * duplication measure is to be executed. `expr` is an optional key for Duplication measure, i.e.,
+   * if it is not defined, the entire row will be checked by duplication measure.
+   */
   val exprs: String = getFromConfig[String](Expression, null)
+
+  /**
+   * Its value defines what exactly would be considered as a bad record after this measure
+   * computes redundancies on the data asset. Since the redundancies are calculated as `duplicate`,
+   * `unique`, `non_unique`, and  `distinct`, the value of this key must also be one of these values.
+   * This key is mandatory and must be defined with appropriate value.
+   */
   private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
 
   validate()
 
+  /**
+   * Duplication measure supports record and metric write
+   */
   override val supportsRecordWrite: Boolean = true
 
   override val supportsMetricWrite: Boolean = true
 
+  /**
+   * The Duplication measure calculates the all metrics of redundancies for the input dataset.
+   * Users can choose which of these metrics defines a "bad record" for them by defining `BadRecordDefinition`
+   * with a supported value.
+   *
+   * Duplication produces the following 5 metrics as result,
+   *  - Total records
+   *  - Duplicate records
+   *  - Unique records
+   *  - NonUnique records
+   *  - Distinct records
+   *
+   *  @return tuple of records dataframe and metric dataframe
+   */
   override def impl(): (DataFrame, DataFrame) = {
     val input = sparkSession.read.table(measureParam.getDataSource)
     val cols = keyCols(input).map(col)
@@ -78,12 +118,14 @@
       .withColumn(NonUnique, min(__Temp).over(window))
       .withColumn(NonUnique, nonUniqueCol)
       .withColumn(Distinct, distinctCol)
+      .withColumn(Total, lit(1))
       .withColumn(valueColumn, col(badnessExpr))
       .drop(__Temp, IsNull)
 
     val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
     val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
     val metricColumn = map(selectCols: _*).as(valueColumn)
+
     val metricDf = aggDf
       .agg(metricAggCols.head, metricAggCols.tail: _*)
       .select(metricColumn)
@@ -93,6 +135,10 @@
     (badRecordsDf, metricDf)
   }
 
+  /**
+   * Since `expr` is a comma separated string of columns, these provided columns must exist in the dataset.
+   * `BadRecordDefinition` must be defined with one of the supported values.
+   */
   override def validate(): Unit = {
     val input = sparkSession.read.table(measureParam.getDataSource)
     val kc = keyCols(input)
@@ -118,6 +164,9 @@
 
 }
 
+/**
+ * Duplication measure constants
+ */
 object DuplicationMeasure {
   final val IsNull: String = "is_null"
   final val Duplicate: String = "duplicate"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
index 36fe790..01b0a65 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
@@ -28,22 +28,90 @@
 import org.apache.griffin.measure.execution.Measure._
 import org.apache.griffin.measure.step.builder.ConstantColumns
 
+/**
+ * Profiling measure.
+ *
+ * Data processing and its analysis can't truly be complete without data profiling -
+ * reviewing source data for content and quality. Data profiling helps to find data quality rules and
+ * requirements that will support a more thorough data quality assessment in a later step.
+ *
+ * The process of Data profiling involves:
+ *   - Collecting descriptive statistics like min, max, count and sum
+ *   - Collecting data types, length and recurring patterns
+ *   - Discovering metadata and assessing its accuracy, etc.
+ *
+ * A common problem in data management circles is the confusion around what is meant by
+ * Data profiling as opposed to Data Quality Assessment due to the interchangeable use of these 2 terms.
+ *
+ * Data Profiling helps us create a huge amount of insight into the quality levels of our
+ * data and helps to find data quality rules and requirements that will support a more thorough
+ * data quality assessment in a later step. For example, data profiling can help us to discover value
+ * frequencies, formats and patterns for each attribute in the data asset. Using data profiling alone
+ * we can find some perceived defects and outliers in the data asset, and we end up with a whole
+ * range of clues based on which correct Quality assessment measures can be defined like
+ * completeness/ distinctness etc.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
 case class ProfilingMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
   import ProfilingMeasure._
 
+  /**
+   * Profiling measure supports only metric write
+   */
   override val supportsRecordWrite: Boolean = false
 
   override val supportsMetricWrite: Boolean = true
 
+  /**
+   * The value for `expr` is a comma separated string of columns in the data asset on which the
+   * profiling measure is to be executed. `expr` is an optional key for Profiling measure,
+   * i.e., if it is not defined, all columns in the data set will be profiled.
+   */
+  val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
+
+  /**
+   * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
+   * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
+   * to `false`, then the counts will be 100% accurate.
+   */
   val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
+
+  /**
+   * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
+   * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
+   * floating-point metric values will be rounded to 2 decimal places.
+   */
   val approxDistinctCount: Boolean =
     getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
 
+  /**
+   * Various metrics are calculated for columns of the data set. If expr is correctly defined,
+   * then metrics are generated for only the given subset of columns else, its generated for all columns.
+   *
+   * List of profiling metrics that are generated,
+   *  - avg_col_len
+   *  - max_col_len
+   *  - min_col_len
+   *  - avg
+   *  - max
+   *  - min
+   *  - approx_distinct_count OR distinct_count
+   *  - variance
+   *  - kurtosis
+   *  - std_dev
+   *  - total
+   *  - data_type
+   *
+   *  @return tuple of records dataframe and metric dataframe
+   */
   override def impl(): (DataFrame, DataFrame) = {
     val input = sparkSession.read.table(measureParam.getDataSource)
-    val profilingColNames = getFromConfig[String](Expression, input.columns.mkString(","))
+    val profilingColNames = exprOpt
+      .getOrElse(input.columns.mkString(","))
       .split(",")
       .map(_.trim.toLowerCase(Locale.ROOT))
       .toSet
@@ -91,6 +159,9 @@
 
 }
 
+/**
+ * Profiling measure constants
+ */
 object ProfilingMeasure {
 
   /**
@@ -136,6 +207,14 @@
     (if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
   }
 
+  /**
+   * Calculates profiling metrics for a column.
+   *
+   * @param field column
+   * @param roundScale round off places.
+   * @param approxDistinctCount to approximate distinct or not.
+   * @return
+   */
   private def getProfilingExprs(
       field: StructField,
       roundScale: Int,
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
index e4f5a90..5b7394c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
@@ -26,23 +26,67 @@
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
 
+/**
+ * SparkSQL Measure.
+ *
+ * In some cases, the pre-defined dimensions/ measures may not enough to model a complete
+ * data quality definition. For such cases, Apache Griffin allows the definition of complex
+ * custom user-defined checks as SparkSQL queries.
+ *
+ * SparkSQL measure is like a pro mode that allows advanced users to configure complex custom checks
+ * that are not covered by other measures. These SparkSQL queries may contain clauses like
+ * select, from, where, group-by, order-by , limit, join, etc.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
 case class SparkSQLMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
   import Measure._
 
+  /**
+   * SparkSQL measure constants
+   */
   final val Complete: String = "complete"
   final val InComplete: String = "incomplete"
 
+  /**
+   * SparkSQL measure supports record and metric write
+   */
   override val supportsRecordWrite: Boolean = true
 
   override val supportsMetricWrite: Boolean = true
 
+  /**
+   * The value for expr is a valid SparkSQL query string. This is a mandatory parameter.
+   */
   private val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
+
+  /**
+   * As the key suggests, its value defines what exactly would be considered as a bad record
+   * after this query executes. In order to separate the good data from bad data, a
+   * bad.record.definition expression must be set. This expression can be a SparkSQL like
+   * expression and must yield a column with boolean data type.
+   *
+   * Note: This expression describes the bad records, i.e. if bad.record.definition = true
+   * for a record, it is marked as bad/ incomplete record.
+   */
   private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
 
   validate()
 
+  /**
+   * Runs the user provided SparkSQL query and marks the records as complete/ incomplete based on the
+   * `BadRecordDefinition`.
+   *
+   * SparkSQL produces the following 3 metrics as result,
+   *  - Total records
+   *  - Complete records
+   *  - Incomplete records
+   *
+   *  @return tuple of records dataframe and metric dataframe
+   */
   override def impl(): (DataFrame, DataFrame) = {
     val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
 
@@ -64,6 +108,9 @@
     (badRecordsDf, metricDf)
   }
 
+  /**
+   * Validates if the `Expression` and `BadRecordDefinition` are not null and non empty.
+   */
   override def validate(): Unit = {
     assert(
       !StringUtil.isNullOrEmpty(expr),
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index 9d5b9ed..cc5d6d6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -125,6 +125,7 @@
     assertResult(metricMap(Unique))("5")
     assertResult(metricMap(NonUnique))("0")
     assertResult(metricMap(Distinct))("5")
+    assertResult(metricMap(Total))("5")
   }
 
   it should "supported complex measure expr" in {
@@ -149,6 +150,7 @@
     assertResult(metricMap(Unique))("2")
     assertResult(metricMap(NonUnique))("1")
     assertResult(metricMap(Distinct))("3")
+    assertResult(metricMap(Total))("5")
   }
 
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 30c102a..39d1a7f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -115,7 +115,12 @@
   "duplication batch job" should "work" in {
     dqApp = runApp("/_distinctness-batch-griffindsl.json")
     val expectedMetrics =
-      Map("duplicate" -> "1", "unique" -> "48", "non_unique" -> "1", "distinct" -> "49")
+      Map(
+        "duplicate" -> "1",
+        "unique" -> "48",
+        "non_unique" -> "1",
+        "distinct" -> "49",
+        "total" -> "50")
 
     runAndCheckResult(Map("duplication_measure" -> expectedMetrics))
   }