blob: f8ea81d309bc23ff90e2b22a0edaf1d1fb88216a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.griffin.measure.execution.impl
import io.netty.util.internal.StringUtil
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
* Completeness Measure.
* Completeness refers to the degree to which values are present in a data collection.
* When data is incomplete due to unavailability (missing records), this does not represent a lack of completeness.
* As far as an individual datum is concerned, only two situations are possible - either a value is assigned
* to the attribute in question or not. The latter case is usually represented by a `null` value.
* @param sparkSession SparkSession for this Griffin Application.
* @param measureParam Object representation of this measure and its configuration.
case class CompletenessMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import Measure._
* Completeness Constants
final val Complete: String = "complete"
final val InComplete: String = "incomplete"
* Completeness measure supports record and metric write
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
* The value for expr is a SQL-like expression string which definition this completeness.
* For more complex definitions, expressions can be clubbed with AND and OR.
* For a tabular data set with columns name, email and age, some examples of `expr` are mentioned below,
* - name is NULL
* - name is NULL and age is NULL
* - email NOT RLIKE `'^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'` (without ` and `)
* Note: This expression describes the bad or incomplete records. This means that
* for "expr": "zipcode is NULL" the records which contain null in zipcode column are considered as incomplete.
val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
* Completeness Measure.
* Completeness evaluates the user provided `expr` for each row of the input dataset.
* Each row that fails this expression is tagged as incomplete record(s), all other record(s) are complete.
* Completeness produces the following 3 metrics as result,
* - Total records
* - Complete records
* - Incomplete records
* @return tuple of records dataframe and metric dataframe
override def impl(): (DataFrame, DataFrame) = {
val exprStr = exprOpt.get
val selectCols =
Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn: Column = map(selectCols: _*).as(valueColumn)
val input =
val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 1).otherwise(0))
val metricDf = badRecordsDf
.withColumn(Total, lit(1))
.agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
.withColumn(Complete, col(Total) - col(InComplete))
(badRecordsDf, metricDf)
* Validates if expression is defined and is non empty.
override def validate(): Unit = {
assert(exprOpt.isDefined, s"'$Expression' must be defined.")
assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
assert(!StringUtil.isNullOrEmpty(exprOpt.get), s"'$Expression' must not be null or empty.")