blob: 0caac17f46e15cbc414f5aadb4656e17a6736c13 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.griffin.measure.execution
import scala.reflect.ClassTag
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.utils.ParamUtil._
* Measure
* An abstraction for a data quality measure implementation.
trait Measure extends Loggable {
import Measure._
* SparkSession for this Griffin Application.
val sparkSession: SparkSession
* Object representation of user defined measure.
val measureParam: MeasureParam
* If this measure supports record writing.
val supportsRecordWrite: Boolean
* If this measure supports metric writing.
val supportsMetricWrite: Boolean
* Metric values column.
final val valueColumn = s"${MeasureColPrefix}_${measureParam.getName}"
* Helper method to get a typed value from measure configuration based on given key.
* @param key given key for which the value needs to be fetched.
* @param defValue default value in case of no value.
* @tparam T type of value to get.
* @return value for given key
def getFromConfig[T: ClassTag](key: String, defValue: T): T = {
measureParam.getConfig.getAnyRef[T](key, defValue)
* Enriches metrics dataframe with some additional keys.
// todo add status col to persist blank metrics if the measure fails
def preProcessMetrics(input: DataFrame): DataFrame = {
if (supportsMetricWrite) {
input.withColumn(Metrics, col(valueColumn)).select(Metrics)
} else input
* Enriches records dataframe with a status column marking rows as good or bad based on values.
def preProcessRecords(input: DataFrame): DataFrame = {
if (supportsRecordWrite) {
.withColumn(Status, when(col(valueColumn) === 0, Good).otherwise(Bad))
} else input
* Implementation of this measure.
* @return tuple of records dataframe and metric dataframe
def impl(): (DataFrame, DataFrame)
* Implementation should define validtion checks in this method (if required).
* This method needs to be called explicitly call this method (preferably during measure creation).
* Defaults to no-op.
def validate(): Unit = {}
* Executes this measure specific transformation on input data source.
* @param batchId batch id to append in case of streaming source.
* @return enriched tuple of records dataframe and metric dataframe
def execute(batchId: Option[Long] = None): (DataFrame, DataFrame) = {
val (recordsDf, metricDf) = impl()
val processedRecordDf = preProcessRecords(recordsDf)
val processedMetricDf = preProcessMetrics(metricDf)
val res = batchId match {
case Some(batchId) =>
implicit val bId: Long = batchId
(appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
case None => (processedRecordDf, processedMetricDf)
* Appends batch id to metrics in case of streaming sources.
* @param input metric dataframe
* @param batchId batch id to append
* @return updated metric dataframe
private def appendBatchIdIfAvailable(input: DataFrame)(implicit batchId: Long): DataFrame = {
input.withColumn(BatchId, typedLit[Long](batchId))
* Measure Constants.
object Measure {
final val DataSource = "data_source"
final val Expression = "expr"
final val MeasureColPrefix = "__measure"
final val Status = "__status"
final val BatchId = "__batch_id"
final val MeasureName = "measure_name"
final val MeasureType = "measure_type"
final val MetricName = "metric_name"
final val MetricValue = "metric_value"
final val Metrics = "metrics"
final val Good = "good"
final val Bad = "bad"
final val Total: String = "total"
final val BadRecordDefinition = "bad.record.definition"
final val AllColumns: String = "*"
final val emptyCol: Column = lit(StringUtils.EMPTY)