[GRIFFIN-358] Added SchemaConformance measure
diff --git a/measure/src/main/resources/config-batch-all-measures.json b/measure/src/main/resources/config-batch-all-measures.json
index 8917ccf..8f911de 100644
--- a/measure/src/main/resources/config-batch-all-measures.json
+++ b/measure/src/main/resources/config-batch-all-measures.json
@@ -15,8 +15,8 @@
},
"pre.proc": [
"select split(value, ',') as part from this",
- "select part[0] as date_time, part[1] as incident, part[2] as address, part[3] as city, part[4] as zipcode from this",
- "select cast(date_time as timestamp) as date_time, incident, address, city, cast(zipcode as int) as zipcode from this"
+ "select part[0] as date_time, part[1] as incident, part[2] as address, part[3] as city, part[4] as zipcode_str from this",
+ "select cast(date_time as timestamp) as date_time, incident, address, city, zipcode_str, cast(zipcode_str as int) as zipcode from this"
]
}
},
@@ -48,8 +48,7 @@
"out": [
{
"type": "metric",
- "name": "comp_metric",
- "flatten": "map"
+ "name": "comp_metric"
},
{
"type": "record",
@@ -69,8 +68,7 @@
"out": [
{
"type": "metric",
- "name": "prof_metric",
- "flatten": "map"
+ "name": "prof_metric"
}
]
},
@@ -85,8 +83,7 @@
"out": [
{
"type": "metric",
- "name": "spark_sql_metric",
- "flatten": "map"
+ "name": "spark_sql_metric"
},
{
"type": "record",
@@ -104,8 +101,7 @@
"out": [
{
"type": "metric",
- "name": "duplication_metric",
- "flatten": "map"
+ "name": "duplication_metric"
},
{
"type": "record",
@@ -129,14 +125,36 @@
"out": [
{
"type": "metric",
- "name": "accuracy_metric",
- "flatten": "map"
+ "name": "accuracy_metric"
},
{
"type": "record",
"name": "accuracy_records"
}
]
+ },
+ {
+ "name": "schema_conformance_measure",
+ "type": "schemaConformance",
+ "data.source": "crime_report_source",
+ "config": {
+ "expr": [
+ {
+ "source.col": "zipcode_str",
+ "type": "int"
+ }
+ ]
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "schema_conformance_metric"
+ },
+ {
+ "type": "record",
+ "name": "schema_conformance_records"
+ }
+ ]
}
],
"sinks": [
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
index 0a55cee..8b12b6d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
@@ -25,7 +25,7 @@
type MeasureType = Value
- val Completeness, Duplication, Profiling, Accuracy, SparkSQL = Value
+ val Completeness, Duplication, Profiling, Accuracy, SparkSQL, SchemaConformance = Value
override def withNameWithDefault(name: String): MeasureType = {
super.withNameWithDefault(name)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
new file mode 100644
index 0000000..eebd979
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import java.util.Locale
+
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{DataType, StringType}
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure
+
+/**
+ * SchemaConformance Measure.
+ *
+ * Schema Conformance ensure that the attributes of a given dataset follow a set of
+ * standard data definitions in terms of data type.
+ * Most binary file formats (orc, avro, etc.) and tabular sources (Hive, RDBMS, etc.)
+ * already impose type constraints on the data they represent but text based formats like
+ * csv, json, xml, etc. do not retain schema information.
+ *
+ * For example, date of birth of customer should be a date, age should be an integer.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
+case class SchemaConformanceMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
+ extends Measure {
+
+ import Measure._
+
+ /**
+ * Representation of a single SchemaConformance expression object.
+ *
+ * @param sourceCol name of source column
+ * @param dataType name of reference column
+ */
+ case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
+
+ /**
+ * SchemaConformance Constants
+ */
+ final val SourceColStr: String = "source.col"
+ final val DataTypeStr: String = "type"
+ final val Complete: String = "complete"
+ final val InComplete: String = "incomplete"
+
+ /**
+ * SchemaConformance measure supports record and metric write
+ */
+ override val supportsRecordWrite: Boolean = true
+
+ override val supportsMetricWrite: Boolean = true
+
+ /**
+ * The value for expr is a json array of mapping objects where each object has 2 fields -
+ * `source.col` and `type`. Each `source.col` must exist in the source data set and is checked
+ * to be of type `type`.
+ * This key is mandatory and expr array must not be empty i.e. at least one mapping must be defined.
+ */
+ val exprOpt: Option[Seq[Map[String, String]]] =
+ Option(getFromConfig[Seq[Map[String, String]]](Expression, null))
+
+ validate()
+
+ /**
+ * SchemaConformance measure evaluates the user provided `expr` for each row of the input dataset.
+ * Each row that fails this type expression is tagged as incomplete record(s), all other record(s) are complete.
+ *
+ * SchemaConformance produces the following 3 metrics as result,
+ * - Total records
+ * - Complete records
+ * - Incomplete records
+ *
+ * @return tuple of records dataframe and metric dataframe
+ */
+ override def impl(): (DataFrame, DataFrame) = {
+ val givenExprs = exprOpt.get.map(toSchemaConformanceExpr).distinct
+
+ val incompleteExpr = givenExprs
+ .map(
+ e =>
+ when(
+ col(e.sourceCol).isNull or
+ col(e.sourceCol).cast(e.dataType).isNull,
+ true)
+ .otherwise(false))
+ .reduce(_ or _)
+
+ val selectCols =
+ Seq(Total, Complete, InComplete).map(e =>
+ map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+ val metricColumn: Column = array(selectCols: _*).as(valueColumn)
+
+ val input = sparkSession.read.table(measureParam.getDataSource)
+ val badRecordsDf = input.withColumn(valueColumn, when(incompleteExpr, 1).otherwise(0))
+
+ val metricDf = badRecordsDf
+ .withColumn(Total, lit(1))
+ .agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
+ .withColumn(Complete, col(Total) - col(InComplete))
+ .select(metricColumn)
+
+ (badRecordsDf, metricDf)
+ }
+
+ /**
+ * JSON representation of the `expr` is deserialized as Map internally which is now converted to an
+ * `SchemaConformanceExpr` representation for a fixed structure across all expression object(s).
+ *
+ * @param map map representation of the `expr`
+ * @return instance of `SchemaConformanceExpr`
+ */
+ private def toSchemaConformanceExpr(map: Map[String, String]): SchemaConformanceExpr = {
+ assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
+ assert(map.contains(DataTypeStr), s"'$DataTypeStr' must be defined.")
+
+ SchemaConformanceExpr(map(SourceColStr), DataType.fromDDL(map(DataTypeStr)))
+ }
+
+ /**
+ * Validates if the expression is not null and non empty along with some dataset specific validations.
+ */
+ override def validate(): Unit = {
+ assert(exprOpt.isDefined, s"'$Expression' must be defined.")
+ assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
+
+ val datasourceName = measureParam.getDataSource
+
+ val dataSourceCols =
+ sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
+ val schemaConformanceExprExpr = exprOpt.get.map(toSchemaConformanceExpr).distinct
+
+ val forDataSource =
+ schemaConformanceExprExpr.map(e => (e.sourceCol, dataSourceCols.contains(e.sourceCol)))
+ val invalidColsDataSource = forDataSource.filterNot(_._2)
+
+ assert(
+ invalidColsDataSource.isEmpty,
+ s"Column(s) [${invalidColsDataSource.map(_._1).mkString(", ")}] " +
+ s"do not exist in data set with name '$datasourceName'")
+ }
+
+}