blob: b32aa42d16166379fe062d277d3e87750c3b66d8 [file] [log] [blame]
package org.apache.griffin.measure.execution.impl
import io.netty.util.internal.StringUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions.{expr => sparkExpr, _}
import org.apache.spark.sql.types.BooleanType
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
case class SparkSQLMeasure(measureParam: MeasureParam) extends Measure {
import Measure._
final val Complete: String = "complete"
final val InComplete: String = "incomplete"
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
private val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
validate()
override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
assert(
df.schema.exists(f => f.name.matches(valueColumn) && f.dataType.isInstanceOf[BooleanType]),
s"Invalid condition provided as $BadRecordDefinition. Does not yield a boolean result.")
val selectCols =
Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn: Column = map(selectCols: _*).as(valueColumn)
val badRecordsDf = df.withColumn(valueColumn, when(col(valueColumn), 1).otherwise(0))
val metricDf = badRecordsDf
.withColumn(Total, lit(1))
.agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
.withColumn(Complete, col(Total) - col(InComplete))
.select(metricColumn)
(badRecordsDf, metricDf)
}
private def validate(): Unit = {
assert(
!StringUtil.isNullOrEmpty(expr),
"Invalid query provided as expr. Must not be null, empty or of invalid type.")
assert(
!StringUtil.isNullOrEmpty(badnessExpr),
"Invalid condition provided as bad.record.definition.")
}
}