package org.apache.griffin.measure.execution
import scala.reflect.ClassTag
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.utils.ParamUtil._
trait Measure extends Loggable {
import Measure._
val measureParam: MeasureParam
val supportsRecordWrite: Boolean
val supportsMetricWrite: Boolean
final val valueColumn = s"${MeasureColPrefix}_${measureParam.getName}"
def getFromConfig[T: ClassTag](key: String, defValue: T): T = {
measureParam.getConfig.getAnyRef[T](key, defValue)
// todo add status col to persist blank metrics if the measure fails
def preProcessMetrics(input: DataFrame): DataFrame = {
if (supportsMetricWrite) {
.withColumn(MeasureName, typedLit[String](measureParam.getName))
.withColumn(MeasureType, typedLit[String](measureParam.getType.toString))
.withColumn(Metrics, col(valueColumn))
.withColumn(DataSource, typedLit[String](measureParam.getDataSource))
.select(MeasureName, MeasureType, DataSource, Metrics)
} else input
def preProcessRecords(input: DataFrame): DataFrame = {
if (supportsRecordWrite) {
.withColumn(Status, when(col(valueColumn) === 0, Good).otherwise(Bad))
} else input
def impl(sparkSession: SparkSession): (DataFrame, DataFrame)
def execute(context: DQContext, batchId: Option[Long] = None): (DataFrame, DataFrame) = {
val (recordsDf, metricDf) = impl(context.sparkSession)
val processedRecordDf = preProcessRecords(recordsDf)
val processedMetricDf = preProcessMetrics(metricDf)
var batchDetailsOpt = StringUtils.EMPTY
val res = batchId match {
case Some(batchId) =>
implicit val bId: Long = batchId
batchDetailsOpt = s"for batch id $bId"
(appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
case None => (processedRecordDf, processedMetricDf)
s"Execution of '${measureParam.getType}' measure " +
s"with name '${measureParam.getName}' is complete $batchDetailsOpt")
private def appendBatchIdIfAvailable(input: DataFrame)(implicit batchId: Long): DataFrame = {
input.withColumn(BatchId, typedLit[Long](batchId))
object Measure {
final val DataSource = "data_source"
final val Expression = "expr"
final val MeasureColPrefix = "__measure"
final val Status = "__status"
final val BatchId = "__batch_id"
final val MeasureName = "measure_name"
final val MeasureType = "measure_type"
final val Metrics = "metrics"
final val Good = "good"
final val Bad = "bad"
final val Total: String = "total"
final val BadRecordDefinition = "bad.record.definition"
final val AllColumns: String = "*"
final val emptyCol: Column = lit(StringUtils.EMPTY)