blob: 52d8bff0b557c3e54abea2b22671454c0b3b7271 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.execution.impl
import java.util.Locale
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
import org.apache.griffin.measure.execution.Measure._
import org.apache.griffin.measure.step.builder.ConstantColumns
/**
* Profiling measure.
*
* Data processing and its analysis can't truly be complete without data profiling -
* reviewing source data for content and quality. Data profiling helps to find data quality rules and
* requirements that will support a more thorough data quality assessment in a later step.
*
* The process of Data profiling involves:
* - Collecting descriptive statistics like min, max, count and sum
* - Collecting data types, length and recurring patterns
* - Discovering metadata and assessing its accuracy, etc.
*
* A common problem in data management circles is the confusion around what is meant by
* Data profiling as opposed to Data Quality Assessment due to the interchangeable use of these 2 terms.
*
* Data Profiling helps us create a huge amount of insight into the quality levels of our
* data and helps to find data quality rules and requirements that will support a more thorough
* data quality assessment in a later step. For example, data profiling can help us to discover value
* frequencies, formats and patterns for each attribute in the data asset. Using data profiling alone
* we can find some perceived defects and outliers in the data asset, and we end up with a whole
* range of clues based on which correct Quality assessment measures can be defined like
* completeness/ distinctness etc.
*
* @param sparkSession SparkSession for this Griffin Application.
* @param measureParam Object representation of this measure and its configuration.
*/
case class ProfilingMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import ProfilingMeasure._
/**
* Profiling measure supports only metric write
*/
override val supportsRecordWrite: Boolean = false
override val supportsMetricWrite: Boolean = true
/**
* The value for `expr` is a comma separated string of columns in the data asset on which the
* profiling measure is to be executed. `expr` is an optional key for Profiling measure,
* i.e., if it is not defined, all columns in the data set will be profiled.
*/
val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
/**
* The value for this key is boolean. If this is `true`, the distinct counts will be approximated
* to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
* to `false`, then the counts will be 100% accurate.
*/
val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
/**
* The value of this key determines what percentage of data is to be profiled. The decimal value
* belongs to range [0.0, 1.0], where 0.0 means the whole dataset will be skipped, 1.0 means the whole
* dataset will be profiled. An intermediate value, say 0.5 will approximately take random 50% of
* the dataset rows (without replacement) and perform profiling on it.
*
* This option can be used when the dataset to be profiled is large, and an approximate profile is needed.
*/
val dataSetSample: Double = getFromConfig[java.lang.Double](DataSetSampleStr, 1.0)
/**
* Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
* to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
* floating-point metric values will be rounded to 2 decimal places.
*/
val approxDistinctCount: Boolean =
getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
/**
* Various metrics are calculated for columns of the data set. If expr is correctly defined,
* then metrics are generated for only the given subset of columns else, its generated for all columns.
*
* List of profiling metrics that are generated,
* - avg_col_len
* - max_col_len
* - min_col_len
* - avg
* - max
* - min
* - approx_distinct_count OR distinct_count
* - variance
* - kurtosis
* - std_dev
* - total
* - data_type
*
* @return tuple of records dataframe and metric dataframe
*/
override def impl(): (DataFrame, DataFrame) = {
info(s"Selecting random ${dataSetSample * 100}% of the rows for profiling.")
val input = sparkSession.read.table(measureParam.getDataSource).sample(dataSetSample)
val profilingColNames = exprOpt
.getOrElse(input.columns.mkString(","))
.split(",")
.map(_.trim.toLowerCase(Locale.ROOT))
.toSet
val profilingCols =
input.schema.fields.filter(f =>
profilingColNames.contains(f.name) && !f.name.equalsIgnoreCase(ConstantColumns.tmst))
assert(
profilingCols.nonEmpty,
s"Invalid columns [${profilingCols.map(_.name).mkString(", ")}] were provided for profiling.")
val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
val colName = field.name
val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)
exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
})
val aggregateDf = profilingCols
.foldLeft(input)((df, field) => {
val colName = field.name
val column = col(colName)
val lengthColName = lengthColFn(colName)
val nullColName = nullsInColFn(colName)
df.withColumn(lengthColName, length(column))
.withColumn(nullColName, when(isnull(column), 1L).otherwise(0L))
})
.agg(count(lit(1L)).as(Total), profilingExprs: _*)
val detailCols =
aggregateDf.columns
.filter(_.startsWith(DetailsPrefix))
.flatMap(c => Seq(map(lit(c.stripPrefix(DetailsPrefix)), col(c))))
val metricDf = aggregateDf.select(array(detailCols: _*).as(valueColumn))
(sparkSession.emptyDataFrame, metricDf)
}
}
/**
* Profiling measure constants
*/
object ProfilingMeasure {
/**
* Options Keys
*/
final val DataSetSampleStr: String = "dataset.sample"
final val RoundScaleStr: String = "round.scale"
final val ApproxDistinctCountStr: String = "approx.distinct.count"
/**
* Structure Keys
*/
final val ColumnDetails: String = "column_details"
private final val DataTypeStr: String = "data_type"
/**
* Prefix Keys
*/
private final val ApproxPrefix: String = "approx_"
private final val DetailsPrefix: String = "details_"
private final val ColumnLengthPrefix: String = "col_len"
private final val IsNullPrefix: String = "is_null"
/**
* Column Detail Keys
*/
private final val NullCount: String = "null_count"
private final val DistinctCount: String = "distinct_count"
private final val Min: String = "min"
private final val Max: String = "max"
private final val Avg: String = "avg"
private final val StdDeviation: String = "std_dev"
private final val Variance: String = "variance"
private final val Kurtosis: String = "kurtosis"
private final val MinColLength: String = s"${Min}_$ColumnLengthPrefix"
private final val MaxColLength: String = s"${Max}_$ColumnLengthPrefix"
private final val AvgColLength: String = s"${Avg}_$ColumnLengthPrefix"
private def lengthColFn(colName: String): String = s"${ColumnLengthPrefix}_$colName"
private def nullsInColFn(colName: String): String = s"${IsNullPrefix}_$colName"
private def forNumericFn(t: DataType, value: Column, alias: String): Column = {
(if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
}
/**
* Calculates profiling metrics for a column.
*
* @param field column
* @param roundScale round off places.
* @param approxDistinctCount to approximate distinct or not.
* @return
*/
private def getProfilingExprs(
field: StructField,
roundScale: Int,
approxDistinctCount: Boolean): Seq[Column] = {
val colName = field.name
val colType = field.dataType
val column = col(colName)
val lengthColExpr = col(lengthColFn(colName))
val nullColExpr = col(nullsInColFn(colName))
val (distinctCountName, distinctCountExpr) =
if (approxDistinctCount) {
(
lit(s"$ApproxPrefix$DistinctCount"),
approx_count_distinct(column).as(s"$ApproxPrefix$DistinctCount"))
} else {
(lit(DistinctCount), countDistinct(column).as(DistinctCount))
}
Seq(
Seq(lit(DataTypeStr), lit(colType.catalogString).as(DataTypeStr)),
Seq(lit(Total), sum(lit(1)).as(Total)),
Seq(lit(MinColLength), min(lengthColExpr).as(MinColLength)),
Seq(lit(MaxColLength), max(lengthColExpr).as(MaxColLength)),
Seq(lit(AvgColLength), forNumericFn(colType, avg(lengthColExpr), AvgColLength)),
Seq(lit(Min), forNumericFn(colType, min(column), Min)),
Seq(lit(Max), forNumericFn(colType, max(column), Max)),
Seq(lit(Avg), forNumericFn(colType, bround(avg(column), roundScale), Avg)),
Seq(
lit(StdDeviation),
forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation)),
Seq(lit(Variance), forNumericFn(colType, bround(variance(column), roundScale), Variance)),
Seq(lit(Kurtosis), forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis)),
Seq(lit(distinctCountName), distinctCountExpr),
Seq(lit(NullCount), sum(nullColExpr).as(NullCount))).flatten
}
}