blob: f8c521464605f8f1120a75d904f5e24eceedf0aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.execution.impl
import io.netty.util.internal.StringUtil
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
/**
* Duplication Measure
*
* Definition of duplication measures used:
* - Duplicate: the number of values that are the same as other values in the list
* - Distinct: the number of non-null values that are different from each other (Non-unique + Unique)
* - Non Unique: the number of values that have at least one duplicate in the list
* - Unique: the number of values that have no duplicates
*
* @param measureParam Measure Param
*/
case class DuplicationMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import DuplicationMeasure._
import Measure._
private final val duplicationMeasures = Seq(Duplicate, Unique, NonUnique, Distinct)
val exprs: String = getFromConfig[String](Expression, null)
private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
validate()
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
override def impl(): (DataFrame, DataFrame) = {
val input = sparkSession.read.table(measureParam.getDataSource)
val cols = keyCols(input).map(col)
val isNullCol = cols.map(x => x.isNull).reduce(_ and _)
val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
val uniqueCol = when(not(isNullCol) and col(Unique) === 1, 1).otherwise(0)
val distinctCol =
when(not(isNullCol) and (col(Unique) === 1 or col(NonUnique) === 1), 1).otherwise(0)
val nonUniqueCol =
when(not(isNullCol) and col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1)
.otherwise(0)
val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val aggDf = input
.select(col(AllColumns), row_number().over(window).as(__Temp))
.withColumn(IsNull, isNullCol)
.withColumn(Duplicate, duplicateCol)
.withColumn(Unique, count(lit(1)).over(window))
.withColumn(Unique, uniqueCol)
.withColumn(NonUnique, min(__Temp).over(window))
.withColumn(NonUnique, nonUniqueCol)
.withColumn(Distinct, distinctCol)
.withColumn(valueColumn, col(badnessExpr))
.drop(__Temp, IsNull)
val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn = map(selectCols: _*).as(valueColumn)
val metricDf = aggDf
.agg(metricAggCols.head, metricAggCols.tail: _*)
.select(metricColumn)
val badRecordsDf = aggDf.drop(duplicationMeasures: _*)
(badRecordsDf, metricDf)
}
override def validate(): Unit = {
val input = sparkSession.read.table(measureParam.getDataSource)
val kc = keyCols(input)
assert(kc.nonEmpty, s"Columns defined in '$Expression' is empty.")
kc.foreach(c =>
assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
assert(
!StringUtil.isNullOrEmpty(badnessExpr),
s"Invalid value '$badnessExpr' provided for $BadRecordDefinition")
assert(badnessExpr match {
case Duplicate | Unique | NonUnique | Distinct => true
case _ => false
}, s"Invalid value '$badnessExpr' was provided for $BadRecordDefinition")
}
private def keyCols(input: DataFrame): Array[String] = {
if (StringUtil.isNullOrEmpty(exprs)) input.columns
else exprs.split(",").map(_.trim)
}.distinct
}
object DuplicationMeasure {
final val IsNull: String = "is_null"
final val Duplicate: String = "duplicate"
final val Unique: String = "unique"
final val NonUnique: String = "non_unique"
final val Distinct: String = "distinct"
final val __Temp: String = "__temp"
}