[GRIFFIN-358] Added SchemaConformance measure
diff --git a/measure/src/main/resources/config-batch-all-measures.json b/measure/src/main/resources/config-batch-all-measures.json
index 8917ccf..8f911de 100644
--- a/measure/src/main/resources/config-batch-all-measures.json
+++ b/measure/src/main/resources/config-batch-all-measures.json
@@ -15,8 +15,8 @@
         },
         "pre.proc": [
           "select split(value, ',') as part from this",
-          "select part[0] as date_time, part[1] as incident, part[2] as address, part[3] as city, part[4] as zipcode from this",
-          "select cast(date_time as timestamp) as date_time, incident, address, city, cast(zipcode as int) as zipcode from this"
+          "select part[0] as date_time, part[1] as incident, part[2] as address, part[3] as city, part[4] as zipcode_str from this",
+          "select cast(date_time as timestamp) as date_time, incident, address, city, zipcode_str, cast(zipcode_str as int) as zipcode from this"
         ]
       }
     },
@@ -48,8 +48,7 @@
       "out": [
         {
           "type": "metric",
-          "name": "comp_metric",
-          "flatten": "map"
+          "name": "comp_metric"
         },
         {
           "type": "record",
@@ -69,8 +68,7 @@
       "out": [
         {
           "type": "metric",
-          "name": "prof_metric",
-          "flatten": "map"
+          "name": "prof_metric"
         }
       ]
     },
@@ -85,8 +83,7 @@
       "out": [
         {
           "type": "metric",
-          "name": "spark_sql_metric",
-          "flatten": "map"
+          "name": "spark_sql_metric"
         },
         {
           "type": "record",
@@ -104,8 +101,7 @@
       "out": [
         {
           "type": "metric",
-          "name": "duplication_metric",
-          "flatten": "map"
+          "name": "duplication_metric"
         },
         {
           "type": "record",
@@ -129,14 +125,36 @@
       "out": [
         {
           "type": "metric",
-          "name": "accuracy_metric",
-          "flatten": "map"
+          "name": "accuracy_metric"
         },
         {
           "type": "record",
           "name": "accuracy_records"
         }
       ]
+    },
+    {
+      "name": "schema_conformance_measure",
+      "type": "schemaConformance",
+      "data.source": "crime_report_source",
+      "config": {
+        "expr": [
+          {
+            "source.col": "zipcode_str",
+            "type": "int"
+          }
+        ]
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "schema_conformance_metric"
+        },
+        {
+          "type": "record",
+          "name": "schema_conformance_records"
+        }
+      ]
     }
   ],
   "sinks": [
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
index 0a55cee..8b12b6d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
@@ -25,7 +25,7 @@
 
   type MeasureType = Value
 
-  val Completeness, Duplication, Profiling, Accuracy, SparkSQL = Value
+  val Completeness, Duplication, Profiling, Accuracy, SparkSQL, SchemaConformance = Value
 
   override def withNameWithDefault(name: String): MeasureType = {
     super.withNameWithDefault(name)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
new file mode 100644
index 0000000..eebd979
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import java.util.Locale
+
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{DataType, StringType}
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure
+
+/**
+ * SchemaConformance Measure.
+ *
+ * Schema Conformance ensure that the attributes of a given dataset follow a set of
+ * standard data definitions in terms of data type.
+ * Most binary file formats (orc, avro, etc.) and tabular sources (Hive, RDBMS, etc.)
+ * already impose type constraints on the data they represent but text based formats like
+ * csv, json, xml, etc. do not retain schema information.
+ *
+ * For example, date of birth of customer should be a date, age should be an integer.
+ *
+ * @param sparkSession SparkSession for this Griffin Application.
+ * @param measureParam Object representation of this measure and its configuration.
+ */
+case class SchemaConformanceMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
+    extends Measure {
+
+  import Measure._
+
+  /**
+   * Representation of a single SchemaConformance expression object.
+   *
+   * @param sourceCol name of source column
+   * @param dataType name of reference column
+   */
+  case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
+
+  /**
+   * SchemaConformance Constants
+   */
+  final val SourceColStr: String = "source.col"
+  final val DataTypeStr: String = "type"
+  final val Complete: String = "complete"
+  final val InComplete: String = "incomplete"
+
+  /**
+   * SchemaConformance measure supports record and metric write
+   */
+  override val supportsRecordWrite: Boolean = true
+
+  override val supportsMetricWrite: Boolean = true
+
+  /**
+   * The value for expr is a json array of mapping objects where each object has 2 fields -
+   * `source.col` and `type`. Each `source.col` must exist in the source data set and is checked
+   * to be of type `type`.
+   * This key is mandatory and expr array must not be empty i.e. at least one mapping must be defined.
+   */
+  val exprOpt: Option[Seq[Map[String, String]]] =
+    Option(getFromConfig[Seq[Map[String, String]]](Expression, null))
+
+  validate()
+
+  /**
+   * SchemaConformance measure evaluates the user provided `expr` for each row of the input dataset.
+   * Each row that fails this type expression is tagged as incomplete record(s), all other record(s) are complete.
+   *
+   * SchemaConformance produces the following 3 metrics as result,
+   *  - Total records
+   *  - Complete records
+   *  - Incomplete records
+   *
+   *  @return tuple of records dataframe and metric dataframe
+   */
+  override def impl(): (DataFrame, DataFrame) = {
+    val givenExprs = exprOpt.get.map(toSchemaConformanceExpr).distinct
+
+    val incompleteExpr = givenExprs
+      .map(
+        e =>
+          when(
+            col(e.sourceCol).isNull or
+              col(e.sourceCol).cast(e.dataType).isNull,
+            true)
+            .otherwise(false))
+      .reduce(_ or _)
+
+    val selectCols =
+      Seq(Total, Complete, InComplete).map(e =>
+        map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+    val metricColumn: Column = array(selectCols: _*).as(valueColumn)
+
+    val input = sparkSession.read.table(measureParam.getDataSource)
+    val badRecordsDf = input.withColumn(valueColumn, when(incompleteExpr, 1).otherwise(0))
+
+    val metricDf = badRecordsDf
+      .withColumn(Total, lit(1))
+      .agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
+      .withColumn(Complete, col(Total) - col(InComplete))
+      .select(metricColumn)
+
+    (badRecordsDf, metricDf)
+  }
+
+  /**
+   * JSON representation of the  `expr` is deserialized as Map internally which is now converted to an
+   * `SchemaConformanceExpr` representation for a fixed structure across all expression object(s).
+   *
+   * @param map map representation of the `expr`
+   * @return instance of `SchemaConformanceExpr`
+   */
+  private def toSchemaConformanceExpr(map: Map[String, String]): SchemaConformanceExpr = {
+    assert(map.contains(SourceColStr), s"'$SourceColStr' must be defined.")
+    assert(map.contains(DataTypeStr), s"'$DataTypeStr' must be defined.")
+
+    SchemaConformanceExpr(map(SourceColStr), DataType.fromDDL(map(DataTypeStr)))
+  }
+
+  /**
+   * Validates if the expression is not null and non empty along with some dataset specific validations.
+   */
+  override def validate(): Unit = {
+    assert(exprOpt.isDefined, s"'$Expression' must be defined.")
+    assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
+
+    val datasourceName = measureParam.getDataSource
+
+    val dataSourceCols =
+      sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
+    val schemaConformanceExprExpr = exprOpt.get.map(toSchemaConformanceExpr).distinct
+
+    val forDataSource =
+      schemaConformanceExprExpr.map(e => (e.sourceCol, dataSourceCols.contains(e.sourceCol)))
+    val invalidColsDataSource = forDataSource.filterNot(_._2)
+
+    assert(
+      invalidColsDataSource.isEmpty,
+      s"Column(s) [${invalidColsDataSource.map(_._1).mkString(", ")}] " +
+        s"do not exist in data set with name '$datasourceName'")
+  }
+
+}