| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.griffin.measure.execution.impl |
| |
| import io.netty.util.internal.StringUtil |
| import org.apache.spark.sql.{Column, DataFrame, SparkSession} |
| import org.apache.spark.sql.functions._ |
| |
| import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam |
| import org.apache.griffin.measure.execution.Measure |
| |
| /** |
| * Completeness Measure. |
| * |
| * Completeness refers to the degree to which values are present in a data collection. |
| * When data is incomplete due to unavailability (missing records), this does not represent a lack of completeness. |
| * As far as an individual datum is concerned, only two situations are possible - either a value is assigned |
| * to the attribute in question or not. The latter case is usually represented by a `null` value. |
| * |
| * @param sparkSession SparkSession for this Griffin Application. |
| * @param measureParam Object representation of this measure and its configuration. |
| */ |
| case class CompletenessMeasure(sparkSession: SparkSession, measureParam: MeasureParam) |
| extends Measure { |
| |
| import Measure._ |
| |
| /** |
| * Completeness Constants |
| */ |
| final val Complete: String = "complete" |
| final val InComplete: String = "incomplete" |
| |
| /** |
| * Completeness measure supports record and metric write |
| */ |
| override val supportsRecordWrite: Boolean = true |
| |
| override val supportsMetricWrite: Boolean = true |
| |
| /** |
| * The value for expr is a SQL-like expression string which definition this completeness. |
| * For more complex definitions, expressions can be clubbed with AND and OR. |
| * |
| * For a tabular data set with columns name, email and age, some examples of `expr` are mentioned below, |
| * - name is NULL |
| * - name is NULL and age is NULL |
| * - email NOT RLIKE `'^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'` (without ` and `) |
| * Note: This expression describes the bad or incomplete records. This means that |
| * for "expr": "zipcode is NULL" the records which contain null in zipcode column are considered as incomplete. |
| */ |
| val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null)) |
| |
| validate() |
| |
| /** |
| * Completeness Measure. |
| * |
| * Completeness evaluates the user provided `expr` for each row of the input dataset. |
| * Each row that fails this expression is tagged as incomplete record(s), all other record(s) are complete. |
| * |
| * Completeness produces the following 3 metrics as result, |
| * - Total records |
| * - Complete records |
| * - Incomplete records |
| * |
| * @return tuple of records dataframe and metric dataframe |
| */ |
| override def impl(): (DataFrame, DataFrame) = { |
| val exprStr = exprOpt.get |
| |
| val selectCols = |
| Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string"))) |
| val metricColumn: Column = map(selectCols: _*).as(valueColumn) |
| |
| val input = sparkSession.read.table(measureParam.getDataSource) |
| val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 1).otherwise(0)) |
| |
| val metricDf = badRecordsDf |
| .withColumn(Total, lit(1)) |
| .agg(sum(Total).as(Total), sum(valueColumn).as(InComplete)) |
| .withColumn(Complete, col(Total) - col(InComplete)) |
| .select(metricColumn) |
| |
| (badRecordsDf, metricDf) |
| } |
| |
| /** |
| * Validates if expression is defined and is non empty. |
| */ |
| override def validate(): Unit = { |
| assert(exprOpt.isDefined, s"'$Expression' must be defined.") |
| assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.") |
| |
| assert(!StringUtil.isNullOrEmpty(exprOpt.get), s"'$Expression' must not be null or empty.") |
| } |
| } |