[GRIFFIN-358] Added documentation for SchemaConformanceMeasure
diff --git a/griffin-doc/measure/measure-configuration-guide/schema_conformance.md b/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
new file mode 100644
index 0000000..768c0ca
--- /dev/null
+++ b/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
@@ -0,0 +1,218 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+SchemaConformance Measure - Configuration Guide
+=====================================
+
+### Introduction
+
+Schema Conformance ensures that the attributes of a given dataset follow a set of standard definitions in terms of data
+type. Most binary file formats (orc, avro, etc.) and tabular sources (Hive, RDBMS, etc.) already impose type constraints
+on the data they represent. However, text based formats like csv, json, xml, etc. do not retain schema information. Such
+formats must be explicitly validated for attribute type conformance.
+
+For example, date of birth of customer should be a date, age should be an integer.
+
+### Configuration
+
+The schema conformance measure can be configured as below,
+
+```json
+{
+  ...
+
+  "measures": [
+    {
+      "name": "schema_conformance_measure",
+      "type": "schemaConformance",
+      "data.source": "crime_report_source",
+      "config": {
+        "expr": [
+          {
+            "source.col": "zipcode_str",
+            "type": "int"
+          }
+        ]
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "schema_conformance_metric"
+        },
+        {
+          "type": "record",
+          "name": "schema_conformance_records"
+        }
+      ]
+    }
+  ]
+
+  ...
+}
+ ```
+
+##### Key Parameters:
+
+| Name    | Type     | Description                            | Supported Values                                          |
+|:--------|:---------|:---------------------------------------|:----------------------------------------------------------|
+| name    | `String` | User-defined name of this measure      | -                                                         |
+| type    | `String` | Type of Measure                        | completeness, duplication, profiling, accuracy, sparksql, schemaConformance  |
+| data.source | `String` | Name of data source on which this measure is applied  | -                                      |
+| config  | `Object` | Configuration params of the measure    | Depends on measure type ([see below](#example-config-object))                       |
+| out     | `List  ` | Define output(s) of measure execution  | [See below](#outputs)                                               |
+
+##### Example `config` Object:
+
+`config` object for schema conformance measure contains only one key `expr`. The value for `expr` is a json array of
+mapping objects where each object has 2 fields - `source.col` and `type`. Each `source.col` must exist in the source
+data set and will be checked for type. This key is mandatory and `expr` array must not be empty i.e., at least one
+mapping must be defined.
+
+It can be defined as mentioned below,
+
+```json
+{
+  ...
+
+  "config": {
+    "expr": [
+      ...
+
+      {
+        "source.col": "zipcode_str",
+        "type": "int"
+      }
+
+      ...
+    ]
+  }
+
+  ...
+}
+ ```
+
+### Outputs
+
+Schema Conformance measure supports the following two outputs,
+
+- Metrics
+- Records
+
+Users can choose to define any combination of these 2 outputs. For no outputs, skip this `out: [ ... ]` section from the
+measure configuration.
+
+#### Metrics Outputs
+
+To write metrics for Schema Conformance measure, configure the measure with output section as below,
+
+```json
+{
+  ...
+
+  "out": [
+    {
+      "type": "metric",
+      "name": "schema_conformance_metric"
+    }
+  ]
+
+  ...
+}
+ ```
+
+This will generate the metrics like below,
+
+```json
+{
+  "applicationId": "local-1623452147576",
+  "job_name": "Batch-All-Measures-Example",
+  "tmst": 1623452154931,
+  "measure_name": "schema_conformance_measure",
+  "metrics": [
+    {
+      "metric_name": "total",
+      "metric_value": "4617"
+    },
+    {
+      "metric_name": "complete",
+      "metric_value": "4459"
+    },
+    {
+      "metric_name": "incomplete",
+      "metric_value": "158"
+    }
+  ],
+  "measure_type": "SchemaConformance",
+  "data_source": "crime_report_source"
+}
+```
+
+#### Record Outputs
+
+To write records as output for Schema Conformance measure, configure the measure with output section as below,
+
+```json
+{
+  ...
+
+  "out": [
+    {
+      "type": "record",
+      "name": "schema_conformance_records"
+    }
+  ]
+
+  ...
+}
+ ```
+
+The above configuration will generate the records output like below,
+
+```
++-------------------+--------------------+--------------------+---------+-----------+-------+-------------+--------+
+|          date_time|            incident|             address|     city|zipcode_str|zipcode|       __tmst|__status|
++-------------------+--------------------+--------------------+---------+-----------+-------+-------------+--------+
+|2015-05-26 05:56:00|PENAL CODE/MISC (...|3900 Block BLOCK ...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 05:56:00|DRUNK IN PUBLIC A...|3900 Block BLOCK ...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 05:56:00|PENAL CODE/MISC (...|3900 Block BLOCK ...|PALO ALTO|      ""   |  null |1623452414131|    bad |
+|2015-05-26 05:56:00|PENAL CODE/MISC (...|3900 Block BLOCK ...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 08:00:00|N&D/POSSESSION (1...|WILKIE WAY & JAME...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 08:00:00|N&D/PARAPHERNALIA...|WILKIE WAY & JAME...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 08:00:00|TRAFFIC/SUSPENDED...|WILKIE WAY & JAME...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 10:30:00|TRAFFIC/MISC (TRA...|EL CAMINO REAL & ...|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 11:31:00|PENAL CODE/MISC (...|   400 Block ALMA ST|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 12:04:00|  B&P/MISC (B&PMISC)|  700 Block URBAN LN|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 12:25:00|PENAL CODE/MISC (...|   500 Block HIGH ST|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 13:06:00|WARRANT/OTHER AGE...| 800 Block BRYANT ST|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 13:30:00|THEFT GRAND/BIKE/...|1 Block UNIVERSIT...|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 13:35:00|PENAL CODE/MISC (...| 800 Block BRYANT ST|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 14:30:00|F&W/BRANDISHING (...|2200 Block EL CAM...|PALO ALTO|      ""   |  null |1623452414131|    bad |
+|2015-05-26 14:43:00|ACCIDENT MINOR IN...|3300 Block BLOCK ...|PALO ALTO|      94306|  94306|1623452414131|    good|
+|2015-05-26 15:22:00|THEFT PETTY/MISC ...|    300 Block POE ST|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 16:31:00|  B&P/MISC (B&PMISC)|500 Block WAVERLE...|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 16:48:00|MUNI CODE/MISC (1...|200 Block UNIVERS...|PALO ALTO|      94301|  94301|1623452414131|    good|
+|2015-05-26 16:48:00|MUNI CODE/MISC (1...|200 Block UNIVERS...|PALO ALTO|      94301|  94301|1623452414131|    good|
++-------------------+--------------------+--------------------+---------+-----------+-------+-------------+--------+
+only showing top 20 rows
+ ```
+
+A new column `__status` has been added to the source data set that acted as input to this measure. The value of this
+column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
+
+_Note:_ This output is for `ConsoleSink`. 
\ No newline at end of file
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
index eebd979..2111d6a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
@@ -94,13 +94,7 @@
     val givenExprs = exprOpt.get.map(toSchemaConformanceExpr).distinct
 
     val incompleteExpr = givenExprs
-      .map(
-        e =>
-          when(
-            col(e.sourceCol).isNull or
-              col(e.sourceCol).cast(e.dataType).isNull,
-            true)
-            .otherwise(false))
+      .map(e => when(col(e.sourceCol).cast(e.dataType).isNull, true).otherwise(false))
       .reduce(_ or _)
 
     val selectCols =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
new file mode 100644
index 0000000..e705675
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+
+class SchemaConformanceMeasureTest extends MeasureTest {
+  var param: MeasureParam = _
+
+  final val SourceColStr: String = "source.col"
+  final val DataTypeStr: String = "type"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    param = MeasureParam(
+      "param",
+      "SchemaConformance",
+      "source",
+      Map(Expression -> Seq(Map(SourceColStr -> "id", DataTypeStr -> "int"))))
+  }
+
+  "SchemaConformanceMeasure" should "validate expression config" in {
+
+    // Validations for SchemaConformance Expr
+
+    // Empty
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(spark, param.copy(config = Map.empty[String, String]))
+    }
+
+    // Incorrect Type and Empty
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(spark, param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+    }
+
+    // Null
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(spark, param.copy(config = Map(Expression -> null)))
+    }
+
+    // Incorrect Type
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(spark, param.copy(config = Map(Expression -> "gender")))
+    }
+
+    // Correct Type and Empty
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(
+        spark,
+        param.copy(config = Map(Expression -> Seq.empty[Map[String, String]])))
+    }
+
+    // Invalid Expr
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(
+        spark,
+        param.copy(config = Map(Expression -> Seq(Map("a" -> "b")))))
+    }
+
+    // Invalid Expr as type is missing
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(
+        spark,
+        param.copy(config = Map(Expression -> Seq(Map(SourceColStr -> "b")))))
+    }
+
+    // Invalid Expr as source.col is missing
+    assertThrows[AssertionError] {
+      SchemaConformanceMeasure(
+        spark,
+        param.copy(config = Map(Expression -> Seq(Map(DataTypeStr -> "b")))))
+    }
+  }
+
+  it should "support metric writing" in {
+    val measure = SchemaConformanceMeasure(spark, param)
+    assertResult(true)(measure.supportsMetricWrite)
+  }
+
+  it should "support record writing" in {
+    val measure = SchemaConformanceMeasure(spark, param)
+    assertResult(true)(measure.supportsRecordWrite)
+  }
+
+  it should "execute defined measure expr" in {
+    val measure = SchemaConformanceMeasure(spark, param)
+    val (recordsDf, metricsDf) = measure.execute(None)
+
+    assertResult(recordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
+
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
+
+    val metricMap = metricsDf
+      .head()
+      .getAs[Seq[Map[String, String]]](Metrics)
+      .map(x => x(MetricName) -> x(MetricValue))
+      .toMap
+
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(measure.Complete))("5")
+    assertResult(metricMap(measure.InComplete))("0")
+  }
+
+}