package org.apache.griffin.measure.execution.impl
import java.util.Locale
import io.netty.util.internal.StringUtil
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
import org.apache.griffin.measure.step.builder.ConstantColumns
* Accuracy Measure.
* Data accuracy refers to the degree to which the values of a said attribute in a data source agree
* with an identified reference truth data (source of correct information).
* In-accurate data may come from different sources like,
* - Dynamically computed values,
* - the result of a manual workflow,
* - irate customers, etc.
* Accuracy measure quantifies the extent to which data sets contains are correct, reliable and certified
* values that are free of error. Higher accuracy values signify that the said data set represents
* the "real-life" values/ objects that it intends to model.
* Accuracy measure is comparative in nature - attributes of data source to be checked are compared with
* attributes of another reference source. Thus, unlike other measures/ dimensions, Accuracy
* relies on definition of 2 sources,
* - the reference (truth) source which contains the good/ correct/ accurate values.
* - the actual data source to be assessed and measured for data accuracy.
* @param sparkSession SparkSession for this Griffin Application.
* @param measureParam Object representation of this measure and its configuration.
case class AccuracyMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
* Representation of a single accuracy expression object.
* @param sourceCol name of source column
* @param refCol name of reference column
case class AccuracyExpr(sourceCol: String, refCol: String)
import AccuracyMeasure._
import Measure._
* Accuracy measure supports record and metric write
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
* The value for expr is a json array of comparison objects where each object has 2 fields -
* `source.col` and `ref.col` which must be actual columns in the source and reference data sets respectively.
* This key is mandatory and expr array must not be empty i.e. at least one comparison must be defined.
val exprOpt: Option[Seq[Map[String, String]]] =
Option(getFromConfig[Seq[Map[String, String]]](Expression, null))
* This is a mandatory parameter which selects the data source which will be used as reference.
* This is a mandatory parameter and this data source must be defined in the sources section
* of the application configuration.
val refSource: String = getFromConfig[String](ReferenceSourceStr, null)
* Performs a measurement of common values as a join between the mapped columns of the reference and source
* data sets.
* Accuracy produces the following 3 metrics as result,
* - Total records
* - Accurate records
* - In accurate records
* @return tuple of records dataframe and metric dataframe
override def impl(): (DataFrame, DataFrame) = {
val originalSource =
val originalCols = originalSource.columns
val dataSource = addColumnPrefix(originalSource, SourcePrefixStr)
val refDataSource =
addColumnPrefix(, refPrefixStr)
val accuracyExprs = exprOpt.get
.map(x => AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))
val joinExpr =
.map(e => col(e.sourceCol) === col(e.refCol))
.reduce(_ and _)
val indicatorExpr =
.map(e => coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol))
.reduce(_ or _)
val nullExpr = => col(e.sourceCol).isNull).reduce(_ or _)
val recordsDf = removeColumnPrefix(
.join(refDataSource, joinExpr, "left")
.withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
.select((originalCols :+ valueColumn).map(col): _*)
val selectCols =
Seq(Total, AccurateStr, InAccurateStr).flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn: Column = map(selectCols: _*).as(valueColumn)
val metricDf = recordsDf
.withColumn(Total, lit(1))
.agg(sum(Total).as(Total), sum(valueColumn).as(InAccurateStr))
.withColumn(AccurateStr, col(Total) - col(InAccurateStr))
(recordsDf, metricDf)
* JSON representation of the `expr` is deserialized as Map internally which is now converted to an
* `AccuracyExpr` representation for a fixed structure across all expression object(s).
* @param map map representation of the `expr`
* @return instance of `AccuracyExpr`
private def toAccuracyExpr(map: Map[String, String]): AccuracyExpr = {
assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
assert(map.contains(ReferenceColStr), s"'$ReferenceColStr' must be defined.")
AccuracyExpr(map(SourceColStr), map(ReferenceColStr))
* Validates if the expression is not null and non empty along with some dataset specific validations.
override def validate(): Unit = {
assert(exprOpt.isDefined, s"'$Expression' must be defined.")
assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
s"'$ReferenceSourceStr' must not be null, empty or of invalid type.")
s"Reference source with name '$refSource' does not exist.")
val datasourceName = measureParam.getDataSource
val dataSourceCols =
val refDataSourceCols =
val accuracyExpr =
val (forDataSource, forRefDataSource) =
e =>
(e.sourceCol, dataSourceCols.contains(e.sourceCol)),
(e.refCol, refDataSourceCols.contains(e.refCol))))
val invalidColsDataSource = forDataSource.filterNot(_._2)
val invalidColsRefSource = forRefDataSource.filterNot(_._2)
s"Column(s) [${", ")}] " +
s"do not exist in data set with name '$datasourceName'")
s"Column(s) [${", ")}] " +
s"do not exist in reference data set with name '$refSource'")
* Helper method to prepend a prefix to all column names to uniquely identify them.
* In case if they exist in both source and target data sets there is no collision.
* @param dataFrame data set
* @param prefix prefix to set
* @return
private def addColumnPrefix(dataFrame: DataFrame, prefix: String): DataFrame = {
val columns = dataFrame.columns
columns.foldLeft(dataFrame)((df, c) => df.withColumnRenamed(c, s"$prefix$c"))
* Helper method to strip a prefix from all column names that previously helped in uniquely identify them.
* @param dataFrame data set
* @param prefix prefix to remove
* @return
private def removeColumnPrefix(dataFrame: DataFrame, prefix: String): DataFrame = {
val columns = dataFrame.columns
columns.foldLeft(dataFrame)((df, c) => df.withColumnRenamed(c, c.stripPrefix(prefix)))
* Accuracy measure constants
object AccuracyMeasure {
final val SourcePrefixStr: String = "__source_"
final val refPrefixStr: String = "__ref_"
final val ReferenceSourceStr: String = "ref.source"
final val SourceColStr: String = "source.col"
final val ReferenceColStr: String = "ref.col"
final val AccurateStr: String = "accurate"
final val InAccurateStr: String = "inaccurate"