blob: 2111d6a2d106dff1f30264d0505785163e7af08b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.execution.impl
import java.util.Locale
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
/**
* SchemaConformance Measure.
*
* Schema Conformance ensure that the attributes of a given dataset follow a set of
* standard data definitions in terms of data type.
* Most binary file formats (orc, avro, etc.) and tabular sources (Hive, RDBMS, etc.)
* already impose type constraints on the data they represent but text based formats like
* csv, json, xml, etc. do not retain schema information.
*
* For example, date of birth of customer should be a date, age should be an integer.
*
* @param sparkSession SparkSession for this Griffin Application.
* @param measureParam Object representation of this measure and its configuration.
*/
case class SchemaConformanceMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
import Measure._
/**
* Representation of a single SchemaConformance expression object.
*
* @param sourceCol name of source column
* @param dataType name of reference column
*/
case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
/**
* SchemaConformance Constants
*/
final val SourceColStr: String = "source.col"
final val DataTypeStr: String = "type"
final val Complete: String = "complete"
final val InComplete: String = "incomplete"
/**
* SchemaConformance measure supports record and metric write
*/
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
/**
* The value for expr is a json array of mapping objects where each object has 2 fields -
* `source.col` and `type`. Each `source.col` must exist in the source data set and is checked
* to be of type `type`.
* This key is mandatory and expr array must not be empty i.e. at least one mapping must be defined.
*/
val exprOpt: Option[Seq[Map[String, String]]] =
Option(getFromConfig[Seq[Map[String, String]]](Expression, null))
validate()
/**
* SchemaConformance measure evaluates the user provided `expr` for each row of the input dataset.
* Each row that fails this type expression is tagged as incomplete record(s), all other record(s) are complete.
*
* SchemaConformance produces the following 3 metrics as result,
* - Total records
* - Complete records
* - Incomplete records
*
* @return tuple of records dataframe and metric dataframe
*/
override def impl(): (DataFrame, DataFrame) = {
val givenExprs = exprOpt.get.map(toSchemaConformanceExpr).distinct
val incompleteExpr = givenExprs
.map(e => when(col(e.sourceCol).cast(e.dataType).isNull, true).otherwise(false))
.reduce(_ or _)
val selectCols =
Seq(Total, Complete, InComplete).map(e =>
map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
val input = sparkSession.read.table(measureParam.getDataSource)
val badRecordsDf = input.withColumn(valueColumn, when(incompleteExpr, 1).otherwise(0))
val metricDf = badRecordsDf
.withColumn(Total, lit(1))
.agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
.withColumn(Complete, col(Total) - col(InComplete))
.select(metricColumn)
(badRecordsDf, metricDf)
}
/**
* JSON representation of the `expr` is deserialized as Map internally which is now converted to an
* `SchemaConformanceExpr` representation for a fixed structure across all expression object(s).
*
* @param map map representation of the `expr`
* @return instance of `SchemaConformanceExpr`
*/
private def toSchemaConformanceExpr(map: Map[String, String]): SchemaConformanceExpr = {
assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
assert(map.contains(DataTypeStr), s"'$DataTypeStr' must be defined.")
SchemaConformanceExpr(map(SourceColStr), DataType.fromDDL(map(DataTypeStr)))
}
/**
* Validates if the expression is not null and non empty along with some dataset specific validations.
*/
override def validate(): Unit = {
assert(exprOpt.isDefined, s"'$Expression' must be defined.")
assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
val datasourceName = measureParam.getDataSource
val dataSourceCols =
sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
val schemaConformanceExprExpr = exprOpt.get.map(toSchemaConformanceExpr).distinct
val forDataSource =
schemaConformanceExprExpr.map(e => (e.sourceCol, dataSourceCols.contains(e.sourceCol)))
val invalidColsDataSource = forDataSource.filterNot(_._2)
assert(
invalidColsDataSource.isEmpty,
s"Column(s) [${invalidColsDataSource.map(_._1).mkString(", ")}] " +
s"do not exist in data set with name '$datasourceName'")
}
}