[GRIFFIN-358] Added code documentation for all new measures.
diff --git a/griffin-doc/measure/measure-configuration-guide/duplication.md b/griffin-doc/measure/measure-configuration-guide/duplication.md
index 1a2c5ec..b256d5d 100644
--- a/griffin-doc/measure/measure-configuration-guide/duplication.md
+++ b/griffin-doc/measure/measure-configuration-guide/duplication.md
@@ -180,6 +180,7 @@
"data_source": "crime_report_source",
"metrics": {
"duplicate": "4363",
+ "total": "4617",
"unique": "58",
"non_unique": "196",
"distinct": "254"
diff --git a/griffin-doc/measure/measure-configuration-guide/profiling.md b/griffin-doc/measure/measure-configuration-guide/profiling.md
index c3bf602..76ec9cc 100644
--- a/griffin-doc/measure/measure-configuration-guide/profiling.md
+++ b/griffin-doc/measure/measure-configuration-guide/profiling.md
@@ -49,7 +49,7 @@
Data Profiling helps us create a huge amount of insight into the quality levels of our data and helps to find data
quality rules and requirements that will support a more thorough data quality assessment in a later step. For example,
data profiling can help us to discover value frequencies, formats and patterns for each attribute in the data asset.
-Using data profiling alone we can find some perceived defects and outliers in the data asset and we end up with a whole
+Using data profiling alone we can find some perceived defects and outliers in the data asset, and we end up with a whole
range of clues based on which correct Quality assessment measures can be defined like completeness/ distinctness etc.
### Configuration
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index 1f49328..966109e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -122,11 +122,10 @@
* @param dataSourceName name of data source
* @param f function to perform
* @param measureCountByDataSource number of measures for each data source
- * @tparam T return type of function `f`
* @return
*/
- private def withCacheIfNecessary[T](dataSourceName: String, f: => T)(
- implicit measureCountByDataSource: Map[String, Int]): T = {
+ private def withCacheIfNecessary(dataSourceName: String, f: => Unit)(
+ implicit measureCountByDataSource: Map[String, Int]): Unit = {
val numMeasures = measureCountByDataSource(dataSourceName)
var isCached = false
if (cacheDataSources && numMeasures > 1) {
@@ -185,7 +184,7 @@
info(s"Successfully executed measure with name '${task._1}' $batchDetailsOpt")
case Failure(e) =>
error(s"Error executing measure with name '${task._1}' $batchDetailsOpt", e)
- })
+ })
while (!tasks.forall(_._2.isCompleted)) {
info(
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index 2c65ddd..44f5758 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -27,24 +27,78 @@
import org.apache.griffin.measure.execution.Measure
import org.apache.griffin.measure.step.builder.ConstantColumns
+/**
+ * Accuracy Measure.
+ *
+ * Data accuracy refers to the degree to which the values of a said attribute in a data source agree
+ * with an identified reference truth data (source of correct information).
+ * In-accurate data may come from different sources like,
+ * - Dynamically computed values,
+ * - the result of a manual workflow,
+ * - irate customers, etc.
+ *
+ * Accuracy measure quantifies the extent to which data sets contains are correct, reliable and certified
+ * values that are free of error. Higher accuracy values signify that the said data set represents
+ * the "real-life" values/ objects that it intends to model.
+ *
+ * Accuracy measure is comparative in nature - attributes of data source to be checked are compared with
+ * attributes of another reference source. Thus, unlike other measures/ dimensions, Accuracy
+ * relies on definition of 2 sources,
+ * - the reference (truth) source which contains the good/ correct/ accurate values.
+ * - the actual data source to be assessed and measured for data accuracy.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
case class AccuracyMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
+ /**
+ * Representation of a single accuracy expression object.
+ *
+ * @param sourceCol name of source column
+ * @param refCol name of reference column
+ */
case class AccuracyExpr(sourceCol: String, refCol: String)
import AccuracyMeasure._
import Measure._
+ /**
+ * Accuracy measure supports record and metric write
+ */
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
- val refSource: String = getFromConfig[String](ReferenceSourceStr, null)
+ /**
+ * The value for expr is a json array of comparison objects where each object has 2 fields -
+ * `source.col` and `ref.col` which must be actual columns in the source and reference data sets respectively.
+ * This key is mandatory and expr array must not be empty i.e. at least one comparison must be defined.
+ */
val exprOpt: Option[Seq[Map[String, String]]] =
Option(getFromConfig[Seq[Map[String, String]]](Expression, null))
+ /**
+ * This is a mandatory parameter which selects the data source which will be used as reference.
+ * This is a mandatory parameter and this data source must be defined in the sources section
+ * of the application configuration.
+ */
+ val refSource: String = getFromConfig[String](ReferenceSourceStr, null)
+
validate()
+ /**
+ * Performs a measurement of common values as a join between the mapped columns of the reference and source
+ * data sets.
+ *
+ * Accuracy produces the following 3 metrics as result,
+ * - Total records
+ * - Accurate records
+ * - In accurate records
+ *
+ * @return tuple of records dataframe and metric dataframe
+ */
override def impl(): (DataFrame, DataFrame) = {
val originalSource = sparkSession.read.table(measureParam.getDataSource)
val originalCols = originalSource.columns
@@ -91,6 +145,23 @@
(recordsDf, metricDf)
}
+ /**
+ * JSON representation of the `expr` is deserialized as Map internally which is now converted to an
+ * `AccuracyExpr` representation for a fixed structure across all expression object(s).
+ *
+ * @param map map representation of the `expr`
+ * @return instance of `AccuracyExpr`
+ */
+ private def toAccuracyExpr(map: Map[String, String]): AccuracyExpr = {
+ assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
+ assert(map.contains(ReferenceColStr), s"'$ReferenceColStr' must be defined.")
+
+ AccuracyExpr(map(SourceColStr), map(ReferenceColStr))
+ }
+
+ /**
+ * Validates if the expression is not null and non empty along with some dataset specific validations.
+ */
override def validate(): Unit = {
assert(exprOpt.isDefined, s"'$Expression' must be defined.")
assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
@@ -99,17 +170,6 @@
!StringUtil.isNullOrEmpty(refSource),
s"'$ReferenceSourceStr' must not be null, empty or of invalid type.")
- datasetValidations()
- }
-
- private def toAccuracyExpr(map: Map[String, String]): AccuracyExpr = {
- assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
- assert(map.contains(ReferenceColStr), s"'$ReferenceColStr' must be defined.")
-
- AccuracyExpr(map(SourceColStr), map(ReferenceColStr))
- }
-
- private def datasetValidations(): Unit = {
assert(
sparkSession.catalog.tableExists(refSource),
s"Reference source with name '$refSource' does not exist.")
@@ -145,17 +205,35 @@
s"do not exist in reference data set with name '$refSource'")
}
+ /**
+ * Helper method to prepend a prefix to all column names to uniquely identify them.
+ * In case if they exist in both source and target data sets there is no collision.
+ *
+ * @param dataFrame data set
+ * @param prefix prefix to set
+ * @return
+ */
private def addColumnPrefix(dataFrame: DataFrame, prefix: String): DataFrame = {
val columns = dataFrame.columns
columns.foldLeft(dataFrame)((df, c) => df.withColumnRenamed(c, s"$prefix$c"))
}
+ /**
+ * Helper method to strip a prefix from all column names that previously helped in uniquely identify them.
+ *
+ * @param dataFrame data set
+ * @param prefix prefix to remove
+ * @return
+ */
private def removeColumnPrefix(dataFrame: DataFrame, prefix: String): DataFrame = {
val columns = dataFrame.columns
columns.foldLeft(dataFrame)((df, c) => df.withColumnRenamed(c, c.stripPrefix(prefix)))
}
}
+/**
+ * Accuracy measure constants
+ */
object AccuracyMeasure {
final val SourcePrefixStr: String = "__source_"
final val refPrefixStr: String = "__ref_"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index 70a4a89..f8ea81d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -24,22 +24,63 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
+/**
+ * Completeness Measure.
+ *
+ * Completeness refers to the degree to which values are present in a data collection.
+ * When data is incomplete due to unavailability (missing records), this does not represent a lack of completeness.
+ * As far as an individual datum is concerned, only two situations are possible - either a value is assigned
+ * to the attribute in question or not. The latter case is usually represented by a `null` value.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
case class CompletenessMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import Measure._
+ /**
+ * Completeness Constants
+ */
final val Complete: String = "complete"
final val InComplete: String = "incomplete"
+ /**
+ * Completeness measure supports record and metric write
+ */
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
+ /**
+ * The value for expr is a SQL-like expression string which definition this completeness.
+ * For more complex definitions, expressions can be clubbed with AND and OR.
+ *
+ * For a tabular data set with columns name, email and age, some examples of `expr` are mentioned below,
+ * - name is NULL
+ * - name is NULL and age is NULL
+ * - email NOT RLIKE `'^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'` (without ` and `)
+ * Note: This expression describes the bad or incomplete records. This means that
+ * for "expr": "zipcode is NULL" the records which contain null in zipcode column are considered as incomplete.
+ */
val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
validate()
+ /**
+ * Completeness Measure.
+ *
+ * Completeness evaluates the user provided `expr` for each row of the input dataset.
+ * Each row that fails this expression is tagged as incomplete record(s), all other record(s) are complete.
+ *
+ * Completeness produces the following 3 metrics as result,
+ * - Total records
+ * - Complete records
+ * - Incomplete records
+ *
+ * @return tuple of records dataframe and metric dataframe
+ */
override def impl(): (DataFrame, DataFrame) = {
val exprStr = exprOpt.get
@@ -59,6 +100,9 @@
(badRecordsDf, metricDf)
}
+ /**
+ * Validates if expression is defined and is non empty.
+ */
override def validate(): Unit = {
assert(exprOpt.isDefined, s"'$Expression' must be defined.")
assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index f8c5214..a325b90 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -27,15 +27,23 @@
import org.apache.griffin.measure.execution.Measure
/**
- * Duplication Measure
+ * Duplication Measure.
*
- * Definition of duplication measures used:
+ * Asserting the measure of duplication of the entities within a data set implies that
+ * no entity exists more than once within the data set and that there is a key that can be used
+ * to uniquely access each entity.
+ *
+ * For example, in a master product table, each product must appear once and be assigned a unique
+ * identifier that represents that product within a system or across multiple applications/ systems.
+ *
+ * Duplication measures the redundancies in a dataset in terms of the following metrics,
* - Duplicate: the number of values that are the same as other values in the list
* - Distinct: the number of non-null values that are different from each other (Non-unique + Unique)
* - Non Unique: the number of values that have at least one duplicate in the list
* - Unique: the number of values that have no duplicates
*
- * @param measureParam Measure Param
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
*/
case class DuplicationMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
@@ -43,17 +51,49 @@
import DuplicationMeasure._
import Measure._
- private final val duplicationMeasures = Seq(Duplicate, Unique, NonUnique, Distinct)
+ /**
+ * Metrics of redundancies
+ */
+ private final val duplicationMeasures = Seq(Total, Duplicate, Unique, NonUnique, Distinct)
+ /**
+ * The value for `expr` is a comma separated string of columns in the data asset on which the
+ * duplication measure is to be executed. `expr` is an optional key for Duplication measure, i.e.,
+ * if it is not defined, the entire row will be checked by duplication measure.
+ */
val exprs: String = getFromConfig[String](Expression, null)
+
+ /**
+ * Its value defines what exactly would be considered as a bad record after this measure
+ * computes redundancies on the data asset. Since the redundancies are calculated as `duplicate`,
+ * `unique`, `non_unique`, and `distinct`, the value of this key must also be one of these values.
+ * This key is mandatory and must be defined with appropriate value.
+ */
private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
validate()
+ /**
+ * Duplication measure supports record and metric write
+ */
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
+ /**
+ * The Duplication measure calculates the all metrics of redundancies for the input dataset.
+ * Users can choose which of these metrics defines a "bad record" for them by defining `BadRecordDefinition`
+ * with a supported value.
+ *
+ * Duplication produces the following 5 metrics as result,
+ * - Total records
+ * - Duplicate records
+ * - Unique records
+ * - NonUnique records
+ * - Distinct records
+ *
+ * @return tuple of records dataframe and metric dataframe
+ */
override def impl(): (DataFrame, DataFrame) = {
val input = sparkSession.read.table(measureParam.getDataSource)
val cols = keyCols(input).map(col)
@@ -78,12 +118,14 @@
.withColumn(NonUnique, min(__Temp).over(window))
.withColumn(NonUnique, nonUniqueCol)
.withColumn(Distinct, distinctCol)
+ .withColumn(Total, lit(1))
.withColumn(valueColumn, col(badnessExpr))
.drop(__Temp, IsNull)
val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn = map(selectCols: _*).as(valueColumn)
+
val metricDf = aggDf
.agg(metricAggCols.head, metricAggCols.tail: _*)
.select(metricColumn)
@@ -93,6 +135,10 @@
(badRecordsDf, metricDf)
}
+ /**
+ * Since `expr` is a comma separated string of columns, these provided columns must exist in the dataset.
+ * `BadRecordDefinition` must be defined with one of the supported values.
+ */
override def validate(): Unit = {
val input = sparkSession.read.table(measureParam.getDataSource)
val kc = keyCols(input)
@@ -118,6 +164,9 @@
}
+/**
+ * Duplication measure constants
+ */
object DuplicationMeasure {
final val IsNull: String = "is_null"
final val Duplicate: String = "duplicate"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
index 36fe790..01b0a65 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
@@ -28,22 +28,90 @@
import org.apache.griffin.measure.execution.Measure._
import org.apache.griffin.measure.step.builder.ConstantColumns
+/**
+ * Profiling measure.
+ *
+ * Data processing and its analysis can't truly be complete without data profiling -
+ * reviewing source data for content and quality. Data profiling helps to find data quality rules and
+ * requirements that will support a more thorough data quality assessment in a later step.
+ *
+ * The process of Data profiling involves:
+ * - Collecting descriptive statistics like min, max, count and sum
+ * - Collecting data types, length and recurring patterns
+ * - Discovering metadata and assessing its accuracy, etc.
+ *
+ * A common problem in data management circles is the confusion around what is meant by
+ * Data profiling as opposed to Data Quality Assessment due to the interchangeable use of these 2 terms.
+ *
+ * Data Profiling helps us create a huge amount of insight into the quality levels of our
+ * data and helps to find data quality rules and requirements that will support a more thorough
+ * data quality assessment in a later step. For example, data profiling can help us to discover value
+ * frequencies, formats and patterns for each attribute in the data asset. Using data profiling alone
+ * we can find some perceived defects and outliers in the data asset, and we end up with a whole
+ * range of clues based on which correct Quality assessment measures can be defined like
+ * completeness/ distinctness etc.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
case class ProfilingMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import ProfilingMeasure._
+ /**
+ * Profiling measure supports only metric write
+ */
override val supportsRecordWrite: Boolean = false
override val supportsMetricWrite: Boolean = true
+ /**
+ * The value for `expr` is a comma separated string of columns in the data asset on which the
+ * profiling measure is to be executed. `expr` is an optional key for Profiling measure,
+ * i.e., if it is not defined, all columns in the data set will be profiled.
+ */
+ val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
+
+ /**
+ * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
+ * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
+ * to `false`, then the counts will be 100% accurate.
+ */
val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
+
+ /**
+ * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
+ * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
+ * floating-point metric values will be rounded to 2 decimal places.
+ */
val approxDistinctCount: Boolean =
getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
+ /**
+ * Various metrics are calculated for columns of the data set. If expr is correctly defined,
+ * then metrics are generated for only the given subset of columns else, its generated for all columns.
+ *
+ * List of profiling metrics that are generated,
+ * - avg_col_len
+ * - max_col_len
+ * - min_col_len
+ * - avg
+ * - max
+ * - min
+ * - approx_distinct_count OR distinct_count
+ * - variance
+ * - kurtosis
+ * - std_dev
+ * - total
+ * - data_type
+ *
+ * @return tuple of records dataframe and metric dataframe
+ */
override def impl(): (DataFrame, DataFrame) = {
val input = sparkSession.read.table(measureParam.getDataSource)
- val profilingColNames = getFromConfig[String](Expression, input.columns.mkString(","))
+ val profilingColNames = exprOpt
+ .getOrElse(input.columns.mkString(","))
.split(",")
.map(_.trim.toLowerCase(Locale.ROOT))
.toSet
@@ -91,6 +159,9 @@
}
+/**
+ * Profiling measure constants
+ */
object ProfilingMeasure {
/**
@@ -136,6 +207,14 @@
(if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
}
+ /**
+ * Calculates profiling metrics for a column.
+ *
+ * @param field column
+ * @param roundScale round off places.
+ * @param approxDistinctCount to approximate distinct or not.
+ * @return
+ */
private def getProfilingExprs(
field: StructField,
roundScale: Int,
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
index e4f5a90..5b7394c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
@@ -26,23 +26,67 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
+/**
+ * SparkSQL Measure.
+ *
+ * In some cases, the pre-defined dimensions/ measures may not enough to model a complete
+ * data quality definition. For such cases, Apache Griffin allows the definition of complex
+ * custom user-defined checks as SparkSQL queries.
+ *
+ * SparkSQL measure is like a pro mode that allows advanced users to configure complex custom checks
+ * that are not covered by other measures. These SparkSQL queries may contain clauses like
+ * select, from, where, group-by, order-by , limit, join, etc.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
case class SparkSQLMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import Measure._
+ /**
+ * SparkSQL measure constants
+ */
final val Complete: String = "complete"
final val InComplete: String = "incomplete"
+ /**
+ * SparkSQL measure supports record and metric write
+ */
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
+ /**
+ * The value for expr is a valid SparkSQL query string. This is a mandatory parameter.
+ */
private val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
+
+ /**
+ * As the key suggests, its value defines what exactly would be considered as a bad record
+ * after this query executes. In order to separate the good data from bad data, a
+ * bad.record.definition expression must be set. This expression can be a SparkSQL like
+ * expression and must yield a column with boolean data type.
+ *
+ * Note: This expression describes the bad records, i.e. if bad.record.definition = true
+ * for a record, it is marked as bad/ incomplete record.
+ */
private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
validate()
+ /**
+ * Runs the user provided SparkSQL query and marks the records as complete/ incomplete based on the
+ * `BadRecordDefinition`.
+ *
+ * SparkSQL produces the following 3 metrics as result,
+ * - Total records
+ * - Complete records
+ * - Incomplete records
+ *
+ * @return tuple of records dataframe and metric dataframe
+ */
override def impl(): (DataFrame, DataFrame) = {
val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
@@ -64,6 +108,9 @@
(badRecordsDf, metricDf)
}
+ /**
+ * Validates if the `Expression` and `BadRecordDefinition` are not null and non empty.
+ */
override def validate(): Unit = {
assert(
!StringUtil.isNullOrEmpty(expr),
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index 9d5b9ed..cc5d6d6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -125,6 +125,7 @@
assertResult(metricMap(Unique))("5")
assertResult(metricMap(NonUnique))("0")
assertResult(metricMap(Distinct))("5")
+ assertResult(metricMap(Total))("5")
}
it should "supported complex measure expr" in {
@@ -149,6 +150,7 @@
assertResult(metricMap(Unique))("2")
assertResult(metricMap(NonUnique))("1")
assertResult(metricMap(Distinct))("3")
+ assertResult(metricMap(Total))("5")
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 30c102a..39d1a7f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -115,7 +115,12 @@
"duplication batch job" should "work" in {
dqApp = runApp("/_distinctness-batch-griffindsl.json")
val expectedMetrics =
- Map("duplicate" -> "1", "unique" -> "48", "non_unique" -> "1", "distinct" -> "49")
+ Map(
+ "duplicate" -> "1",
+ "unique" -> "48",
+ "non_unique" -> "1",
+ "distinct" -> "49",
+ "total" -> "50")
runAndCheckResult(Map("duplication_measure" -> expectedMetrics))
}