blob: 0caac17f46e15cbc414f5aadb4656e17a6736c13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.execution
import scala.reflect.ClassTag
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.utils.ParamUtil._
/**
* Measure
*
* An abstraction for a data quality measure implementation.
*/
trait Measure extends Loggable {
import Measure._
/**
* SparkSession for this Griffin Application.
*/
val sparkSession: SparkSession
/**
* Object representation of user defined measure.
*/
val measureParam: MeasureParam
/**
* If this measure supports record writing.
*/
val supportsRecordWrite: Boolean
/**
* If this measure supports metric writing.
*/
val supportsMetricWrite: Boolean
/**
* Metric values column.
*/
final val valueColumn = s"${MeasureColPrefix}_${measureParam.getName}"
/**
* Helper method to get a typed value from measure configuration based on given key.
*
* @param key given key for which the value needs to be fetched.
* @param defValue default value in case of no value.
* @tparam T type of value to get.
* @return value for given key
*/
def getFromConfig[T: ClassTag](key: String, defValue: T): T = {
measureParam.getConfig.getAnyRef[T](key, defValue)
}
/**
* Enriches metrics dataframe with some additional keys.
*/
// todo add status col to persist blank metrics if the measure fails
def preProcessMetrics(input: DataFrame): DataFrame = {
if (supportsMetricWrite) {
input.withColumn(Metrics, col(valueColumn)).select(Metrics)
} else input
}
/**
* Enriches records dataframe with a status column marking rows as good or bad based on values.
*/
def preProcessRecords(input: DataFrame): DataFrame = {
if (supportsRecordWrite) {
input
.withColumn(Status, when(col(valueColumn) === 0, Good).otherwise(Bad))
.drop(valueColumn)
} else input
}
/**
* Implementation of this measure.
*
* @return tuple of records dataframe and metric dataframe
*/
def impl(): (DataFrame, DataFrame)
/**
* Implementation should define validtion checks in this method (if required).
* This method needs to be called explicitly call this method (preferably during measure creation).
*
* Defaults to no-op.
*/
def validate(): Unit = {}
/**
* Executes this measure specific transformation on input data source.
*
* @param batchId batch id to append in case of streaming source.
* @return enriched tuple of records dataframe and metric dataframe
*/
def execute(batchId: Option[Long] = None): (DataFrame, DataFrame) = {
val (recordsDf, metricDf) = impl()
val processedRecordDf = preProcessRecords(recordsDf)
val processedMetricDf = preProcessMetrics(metricDf)
val res = batchId match {
case Some(batchId) =>
implicit val bId: Long = batchId
(appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
case None => (processedRecordDf, processedMetricDf)
}
res
}
/**
* Appends batch id to metrics in case of streaming sources.
*
* @param input metric dataframe
* @param batchId batch id to append
* @return updated metric dataframe
*/
private def appendBatchIdIfAvailable(input: DataFrame)(implicit batchId: Long): DataFrame = {
input.withColumn(BatchId, typedLit[Long](batchId))
}
}
/**
* Measure Constants.
*/
object Measure {
final val DataSource = "data_source"
final val Expression = "expr"
final val MeasureColPrefix = "__measure"
final val Status = "__status"
final val BatchId = "__batch_id"
final val MeasureName = "measure_name"
final val MeasureType = "measure_type"
final val MetricName = "metric_name"
final val MetricValue = "metric_value"
final val Metrics = "metrics"
final val Good = "good"
final val Bad = "bad"
final val Total: String = "total"
final val BadRecordDefinition = "bad.record.definition"
final val AllColumns: String = "*"
final val emptyCol: Column = lit(StringUtils.EMPTY)
}