blob: 01b0a65e57b02afebab43f80b35550dfb82c398a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.griffin.measure.execution.impl
import java.util.Locale
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
import org.apache.griffin.measure.execution.Measure._
import org.apache.griffin.measure.step.builder.ConstantColumns
* Profiling measure.
* Data processing and its analysis can't truly be complete without data profiling -
* reviewing source data for content and quality. Data profiling helps to find data quality rules and
* requirements that will support a more thorough data quality assessment in a later step.
* The process of Data profiling involves:
* - Collecting descriptive statistics like min, max, count and sum
* - Collecting data types, length and recurring patterns
* - Discovering metadata and assessing its accuracy, etc.
* A common problem in data management circles is the confusion around what is meant by
* Data profiling as opposed to Data Quality Assessment due to the interchangeable use of these 2 terms.
* Data Profiling helps us create a huge amount of insight into the quality levels of our
* data and helps to find data quality rules and requirements that will support a more thorough
* data quality assessment in a later step. For example, data profiling can help us to discover value
* frequencies, formats and patterns for each attribute in the data asset. Using data profiling alone
* we can find some perceived defects and outliers in the data asset, and we end up with a whole
* range of clues based on which correct Quality assessment measures can be defined like
* completeness/ distinctness etc.
* @param sparkSession SparkSession for this Griffin Application.
* @param measureParam Object representation of this measure and its configuration.
case class ProfilingMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import ProfilingMeasure._
* Profiling measure supports only metric write
override val supportsRecordWrite: Boolean = false
override val supportsMetricWrite: Boolean = true
* The value for `expr` is a comma separated string of columns in the data asset on which the
* profiling measure is to be executed. `expr` is an optional key for Profiling measure,
* i.e., if it is not defined, all columns in the data set will be profiled.
val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
* The value for this key is boolean. If this is `true`, the distinct counts will be approximated
* to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
* to `false`, then the counts will be 100% accurate.
val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
* Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
* to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
* floating-point metric values will be rounded to 2 decimal places.
val approxDistinctCount: Boolean =
getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
* Various metrics are calculated for columns of the data set. If expr is correctly defined,
* then metrics are generated for only the given subset of columns else, its generated for all columns.
* List of profiling metrics that are generated,
* - avg_col_len
* - max_col_len
* - min_col_len
* - avg
* - max
* - min
* - approx_distinct_count OR distinct_count
* - variance
* - kurtosis
* - std_dev
* - total
* - data_type
* @return tuple of records dataframe and metric dataframe
override def impl(): (DataFrame, DataFrame) = {
val input =
val profilingColNames = exprOpt
val profilingCols =
input.schema.fields.filter(f =>
profilingColNames.contains( && !
s"Invalid columns [${", ")}] were provided for profiling.")
val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
val colName =
val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)
exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
val aggregateDf = profilingCols
.foldLeft(input)((df, field) => {
val colName =
val column = col(colName)
val lengthColName = lengthColFn(colName)
val nullColName = nullsInColFn(colName)
df.withColumn(lengthColName, length(column))
.withColumn(nullColName, when(isnull(column), 1L).otherwise(0L))
.agg(count(lit(1L)).as(Total), profilingExprs: _*)
val detailCols =
.flatMap(c => Seq(lit(c.stripPrefix(DetailsPrefix)), col(c)))
val metricDf = aggregateDf
.withColumn(ColumnDetails, map(detailCols: _*))
.select(Total, ColumnDetails)
.select(map(lit(ColumnDetails), col(ColumnDetails)).as(valueColumn))
(sparkSession.emptyDataFrame, metricDf)
* Profiling measure constants
object ProfilingMeasure {
* Options Keys
final val RoundScaleStr: String = "round.scale"
final val ApproxDistinctCountStr: String = "approx.distinct.count"
* Structure Keys
final val ColumnDetails: String = "column_details"
private final val DataTypeStr: String = "data_type"
* Prefix Keys
private final val ApproxPrefix: String = "approx_"
private final val DetailsPrefix: String = "details_"
private final val ColumnLengthPrefix: String = "col_len"
private final val IsNullPrefix: String = "is_null"
* Column Detail Keys
private final val NullCount: String = "null_count"
private final val DistinctCount: String = "distinct_count"
private final val Min: String = "min"
private final val Max: String = "max"
private final val Avg: String = "avg"
private final val StdDeviation: String = "std_dev"
private final val Variance: String = "variance"
private final val Kurtosis: String = "kurtosis"
private final val MinColLength: String = s"${Min}_$ColumnLengthPrefix"
private final val MaxColLength: String = s"${Max}_$ColumnLengthPrefix"
private final val AvgColLength: String = s"${Avg}_$ColumnLengthPrefix"
private def lengthColFn(colName: String): String = s"${ColumnLengthPrefix}_$colName"
private def nullsInColFn(colName: String): String = s"${IsNullPrefix}_$colName"
private def forNumericFn(t: DataType, value: Column, alias: String): Column = {
(if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
* Calculates profiling metrics for a column.
* @param field column
* @param roundScale round off places.
* @param approxDistinctCount to approximate distinct or not.
* @return
private def getProfilingExprs(
field: StructField,
roundScale: Int,
approxDistinctCount: Boolean): Seq[Column] = {
val colName =
val colType = field.dataType
val column = col(colName)
val lengthColExpr = col(lengthColFn(colName))
val nullColExpr = col(nullsInColFn(colName))
val (distinctCountName, distinctCountExpr) =
if (approxDistinctCount) {
} else {
(lit(DistinctCount), countDistinct(column).as(DistinctCount))
Seq(lit(DataTypeStr), lit(colType.catalogString).as(DataTypeStr)),
Seq(lit(Total), sum(lit(1)).as(Total)),
Seq(lit(MinColLength), min(lengthColExpr).as(MinColLength)),
Seq(lit(MaxColLength), max(lengthColExpr).as(MaxColLength)),
Seq(lit(AvgColLength), forNumericFn(colType, avg(lengthColExpr), AvgColLength)),
Seq(lit(Min), forNumericFn(colType, min(column), Min)),
Seq(lit(Max), forNumericFn(colType, max(column), Max)),
Seq(lit(Avg), forNumericFn(colType, bround(avg(column), roundScale), Avg)),
forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation)),
Seq(lit(Variance), forNumericFn(colType, bround(variance(column), roundScale), Variance)),
Seq(lit(Kurtosis), forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis)),
Seq(lit(distinctCountName), distinctCountExpr),
Seq(lit(NullCount), sum(nullColExpr).as(NullCount))).flatten