[GRIFFIN-365] Measure Enhancements and Stability fixes (#593)

* [GRIFFIN-365] Update pom.xml with scapegoat and other changes

* [GRIFFIN-365] Remove ban on elasticsearch-spark dependency

* [GRIFFIN-365] Measure enhancements

* [GRIFFIN-365] Fix test cases

* [GRIFFIN-365] Updates to documentation and fix for breaking tests

* [GRIFFIN-365] Revert elasticsearch changes

* Update schema_conformance.md

* Update sparksql.md
diff --git a/griffin-doc/measure/measure-configuration-guide/accuracy.md b/griffin-doc/measure/measure-configuration-guide/accuracy.md
index 95fe54b..f0ad0e5 100644
--- a/griffin-doc/measure/measure-configuration-guide/accuracy.md
+++ b/griffin-doc/measure/measure-configuration-guide/accuracy.md
@@ -242,4 +242,4 @@
 A new column `__status` has been added to the original data set on which this measure was executed. The value of this
 column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
 
-_Note:_ This output is for `ConsoleSink`. 
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`. 
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/completeness.md b/griffin-doc/measure/measure-configuration-guide/completeness.md
index accd5a5..e932156 100644
--- a/griffin-doc/measure/measure-configuration-guide/completeness.md
+++ b/griffin-doc/measure/measure-configuration-guide/completeness.md
@@ -31,9 +31,9 @@
 users to define SQL-like expressions which describe their definition of completeness. For a tabular data set with
 columns `name`, `email` and `age`, some examples of such completeness definitions are mentioned below,
 
-- `name is NULL`
-- `name is NULL and age is NULL`
-- `email NOT RLIKE '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'`
+- `name is NOT NULL`
+- `name is NOT NULL and age is NOT NULL`
+- `email RLIKE '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'`
 
 ### Configuration
 
@@ -49,7 +49,7 @@
       "type": "completeness",
       "data.source": "crime_report_source",
       "config": {
-        "expr": "zipcode is null OR city is null"
+        "expr": "zipcode is not null and city is not null"
       },
       "out": [
         {
@@ -84,8 +84,8 @@
 `config` object for completeness measure contains only one key `expr`. The value for `expr` is a SQL-like expression
 string which definition this completeness. For more complex definitions, expressions can be clubbed with `AND` and `OR`.
 
-_Note:_ This expression describes the bad or incomplete records. This means that for `"expr": "zipcode is NULL"` the
-records which contain `null` in zipcode column are considered as incomplete.
+_Note:_ This expression describes the good or complete records. This means that for `"expr": "zipcode is NOT NULL"` the
+records which contain `null` in zipcode column are incomplete.
 
 It can be defined as mentioned below,
 
@@ -94,7 +94,7 @@
   ...
 
   "config": {
-    "expr": "zipcode is null OR city is null"
+    "expr": "zipcode is NOT null AND city is NOT null"
   }
 
   ...
@@ -136,7 +136,6 @@
 {
   "applicationId": "local-1623452412322",
   "job_name": "Batch-All-Measures-Example",
-  "tmst": 1623452423891,
   "measure_name": "completeness_measure",
   "metrics": [
     {
@@ -209,4 +208,4 @@
 A new column `__status` has been added to the original data set on which this measure was executed. The value of this
 column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
 
-_Note:_ This output is for `ConsoleSink`. 
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`. 
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/duplication.md b/griffin-doc/measure/measure-configuration-guide/duplication.md
index 15e7408..71a79df 100644
--- a/griffin-doc/measure/measure-configuration-guide/duplication.md
+++ b/griffin-doc/measure/measure-configuration-guide/duplication.md
@@ -254,4 +254,4 @@
 column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
 These values for `__status` column are based on the value of user-defined key `bad.record.definition`.
 
-_Note:_ This output is for `ConsoleSink`. 
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`. 
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/profiling.md b/griffin-doc/measure/measure-configuration-guide/profiling.md
index b6ee6fa..5c7873b 100644
--- a/griffin-doc/measure/measure-configuration-guide/profiling.md
+++ b/griffin-doc/measure/measure-configuration-guide/profiling.md
@@ -207,4 +207,6 @@
 ```
 
 _Note:_ Some mathematical metrics are bound to the type of attribute under consideration, for example standard deviation
-cannot be calculated for a column name of string type, thus, the value for these metrics are null for such columns.
\ No newline at end of file
+cannot be calculated for a column name of string type, thus, the value for these metrics are null for such columns.
+
+_Note:_ This output is for `ConsoleSink`. 
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/schema_conformance.md b/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
index 768c0ca..2f47dde 100644
--- a/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
+++ b/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
@@ -215,4 +215,4 @@
 A new column `__status` has been added to the source data set that acted as input to this measure. The value of this
 column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
 
-_Note:_ This output is for `ConsoleSink`. 
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`. 
diff --git a/griffin-doc/measure/measure-configuration-guide/sparksql.md b/griffin-doc/measure/measure-configuration-guide/sparksql.md
index d8ca39e..e82c743 100644
--- a/griffin-doc/measure/measure-configuration-guide/sparksql.md
+++ b/griffin-doc/measure/measure-configuration-guide/sparksql.md
@@ -211,7 +211,7 @@
 column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
 These values for `__status` column are based on the value of user-defined key `bad.record.definition`.
 
-_Note:_ This output is for `ConsoleSink`.
+_Note:_ These outputs are for `ConsoleSink`.
 
 **Further Reading**
 
@@ -224,4 +224,4 @@
 `bad.record.definition: "__measure_{measure_name}"` in the measure config.
 
 For example if the user-defined name of the measure is `spark_sql_measure`, the alias column name
-becomes `__measure_spark_sql_measure`.
\ No newline at end of file
+becomes `__measure_spark_sql_measure`.
diff --git a/measure/pom.xml b/measure/pom.xml
index 3cd8bc6..6d97107 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -58,6 +58,10 @@
         <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
         <mockito.version>3.2.2.0</mockito.version>
         <scalatest.version>3.2.3</scalatest.version>
+        <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
+        <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
+        <scalatest.maven.plugin.version>2.0.2</scalatest.maven.plugin.version>
+        <scapegoat.version>1.4.8</scapegoat.version>
     </properties>
 
     <dependencies>
@@ -68,6 +72,13 @@
             <version>${scala.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.sksamuel.scapegoat</groupId>
+            <artifactId>scalac-scapegoat-plugin_${scala.version}</artifactId>
+            <version>${scapegoat.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Spark Dependencies -->
         <dependency>
             <groupId>org.apache.spark</groupId>
@@ -249,9 +260,13 @@
                         <arg>-deprecation</arg>
                         <arg>-feature</arg>
                         <arg>-explaintypes</arg>
+                        <arg>-P:scapegoat:dataDir:${project.build.directory}</arg>
+                        <arg>-P:scapegoat:reports:scalastyle</arg>
+                        <arg>
+                            -P:scapegoat:overrideLevels:UnsafeTraversableMethods=Warning:OptionGet=Warning:TryGet=Warning
+                        </arg>
                     </args>
                     <jvmArgs>
-                        <jvmArg>-Xms64m</jvmArg>
                         <jvmArg>-Xmx1024m</jvmArg>
                     </jvmArgs>
                     <javacArgs>
@@ -260,19 +275,41 @@
                         <javacArg>${java.version}</javacArg>
                         <javacArg>-Xlint:all,-serial,-path,-try</javacArg>
                     </javacArgs>
+                    <compilerPlugins>
+                        <compilerPlugin>
+                            <groupId>com.sksamuel.scapegoat</groupId>
+                            <artifactId>scalac-scapegoat-plugin_${scala.version}</artifactId>
+                            <version>${scapegoat.version}</version>
+                        </compilerPlugin>
+                    </compilerPlugins>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven.surefire.plugin.version}</version>
+                <configuration>
+                    <skipTests>true</skipTests>
                 </configuration>
             </plugin>
 
             <plugin>
                 <groupId>org.scalatest</groupId>
                 <artifactId>scalatest-maven-plugin</artifactId>
-                <version>2.0.2</version>
+                <version>${scalatest.maven.plugin.version}</version>
                 <configuration>
                     <testFailureIgnore>false</testFailureIgnore>
                     <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
                     <junitxml>.</junitxml>
                     <filereports>TestSuiteReport.txt</filereports>
+                    <parallel>false</parallel>
+                    <forkMode>once</forkMode>
                     <stderr/>
+                    <argLine>
+                        -ea -Xmx4g -Xss4m -XX:ReservedCodeCacheSize=1g -Dio.netty.tryReflectionSetAccessible=true
+                        -Dfile.encoding=${project.build.sourceEncoding}
+                    </argLine>
                     <systemProperties>
                         <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
                     </systemProperties>
@@ -297,18 +334,29 @@
                         <goals>
                             <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    <shadedPattern>griffin.org.apache.http</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.zaxxer</pattern>
+                                    <shadedPattern>griffin.com.zaxxer</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <finalName>${project.build.finalName}-with-dependencies</finalName>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.apache.griffin.measure.Application</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
                     </execution>
                 </executions>
-                <configuration>
-                    <createDependencyReducedPom>false</createDependencyReducedPom>
-                    <minimizeJar>true</minimizeJar>
-                    <relocations>
-                        <relocation>
-                            <pattern>org.apache.http</pattern>
-                            <shadedPattern>griffin.org.apache.http</shadedPattern>
-                        </relocation>
-                    </relocations>
-                </configuration>
             </plugin>
 
             <plugin>
@@ -324,7 +372,7 @@
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>compile</phase>
+                        <phase>validate</phase>
                         <goals>
                             <goal>format</goal>
                         </goals>
@@ -350,7 +398,7 @@
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>compile</phase>
+                        <phase>validate</phase>
                         <goals>
                             <goal>check</goal>
                         </goals>
@@ -359,86 +407,18 @@
             </plugin>
 
             <plugin>
-                <groupId>org.jacoco</groupId>
-                <artifactId>jacoco-maven-plugin</artifactId>
-                <version>0.8.2</version>
-                <executions>
-                    <execution>
-                        <id>default-prepare-agent</id>
-                        <goals>
-                            <goal>prepare-agent</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>report</id>
-                        <phase>test</phase>
-                        <goals>
-                            <goal>report</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
                 <groupId>org.scoverage</groupId>
                 <artifactId>scoverage-maven-plugin</artifactId>
                 <version>${scoverage.plugin.version}</version>
                 <configuration>
                     <scalaVersion>${scala.version}</scalaVersion>
-                    <aggregate>true</aggregate>
+                    <aggregateOnly>true</aggregateOnly>
                     <highlighting>true</highlighting>
                 </configuration>
             </plugin>
 
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>3.2.0</version>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>jar</goal>
-                        </goals>
-                        <configuration>
-                            <finalName>${project.artifactId}-${project.version}</finalName>
-                            <classifier>spark-${spark.major.version}_scala-${scala.binary.version}</classifier>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>3.3.0</version>
-                <configuration>
-                    <tarLongFileMode>posix</tarLongFileMode>
-                    <finalName>
-                        ${project.artifactId}-${project.version}-spark-${spark.major.version}_scala-${scala.binary.version}
-                    </finalName>
-                    <archive>
-                        <manifest>
-                            <mainClass>org.apache.griffin.measure.Application</mainClass>
-                        </manifest>
-                    </archive>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-enforcer-plugin</artifactId>
                 <version>3.0.0-M2</version>
                 <executions>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 1765f4d..7c695c6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -89,7 +89,7 @@
 
       assert(
         repeatedMeasures.isEmpty,
-        s"Measure names must be unique. " +
+        "Measure names must be unique. " +
           s"Duplicate Measures names ['${repeatedMeasures.mkString("', '")}'] were found.")
 
       val invalidMeasureSources = measures
@@ -100,7 +100,7 @@
 
       assert(
         invalidMeasureSources.isEmpty,
-        s"Measure source(s) undefined." +
+        "Measure source(s) undefined." +
           s" Unknown source(s) ['${invalidMeasureSources.mkString("', '")}'] were found.")
     } else if (evaluateRule != null) {
       evaluateRule.validate()
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index ab35fb6..6698726 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -75,7 +75,7 @@
   }
 
   def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
-    val path: String = s"/_sql?format=csv"
+    val path: String = "/_sql?format=csv"
     info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
     val dfOpt =
       try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
index 0caac17..cedcfda 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
@@ -98,10 +98,10 @@
    *
    * @return tuple of records dataframe and metric dataframe
    */
-  def impl(): (DataFrame, DataFrame)
+  def impl(dataSource: DataFrame): (DataFrame, DataFrame)
 
   /**
-   * Implementation should define validtion checks in this method (if required).
+   * Implementation should define validation checks in this method (if required).
    * This method needs to be called explicitly call this method (preferably during measure creation).
    *
    * Defaults to no-op.
@@ -111,35 +111,18 @@
   /**
    * Executes this measure specific transformation on input data source.
    *
-   * @param batchId batch id to append in case of streaming source.
    * @return enriched tuple of records dataframe and metric dataframe
    */
-  def execute(batchId: Option[Long] = None): (DataFrame, DataFrame) = {
-    val (recordsDf, metricDf) = impl()
+  def execute(dataSource: DataFrame): (DataFrame, DataFrame) = {
+    val (recordsDf, metricDf) = impl(dataSource)
 
     val processedRecordDf = preProcessRecords(recordsDf)
     val processedMetricDf = preProcessMetrics(metricDf)
 
-    val res = batchId match {
-      case Some(batchId) =>
-        implicit val bId: Long = batchId
-        (appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
-      case None => (processedRecordDf, processedMetricDf)
-    }
-
-    res
+    (processedRecordDf, processedMetricDf)
   }
 
-  /**
-   * Appends batch id to metrics in case of streaming sources.
-   *
-   * @param input metric dataframe
-   * @param batchId batch id to append
-   * @return updated metric dataframe
-   */
-  private def appendBatchIdIfAvailable(input: DataFrame)(implicit batchId: Long): DataFrame = {
-    input.withColumn(BatchId, typedLit[Long](batchId))
-  }
+  protected def nullToZero(column: Column): Column = when(column.isNull, 0).otherwise(column)
 
 }
 
@@ -152,7 +135,8 @@
   final val Expression = "expr"
   final val MeasureColPrefix = "__measure"
   final val Status = "__status"
-  final val BatchId = "__batch_id"
+  final val BatchId = "batch_id"
+
   final val MeasureName = "measure_name"
   final val MeasureType = "measure_type"
   final val MetricName = "metric_name"
@@ -164,6 +148,7 @@
   final val Total: String = "total"
   final val BadRecordDefinition = "bad.record.definition"
   final val AllColumns: String = "*"
+  final val RowNumber: String = "__row_number"
 
   final val emptyCol: Column = lit(StringUtils.EMPTY)
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index a84dff0..71a2ef3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -18,13 +18,11 @@
 package org.apache.griffin.measure.execution
 
 import java.util.Date
-import java.util.concurrent.Executors
 
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
+import scala.concurrent.{ExecutionContextExecutorService, Future}
 import scala.util._
 
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
@@ -55,7 +53,8 @@
  *
  * @param context Instance of `DQContext`
  */
-case class MeasureExecutor(context: DQContext) extends Loggable {
+case class MeasureExecutor(context: DQContext, ec: ExecutionContextExecutorService)
+    extends Loggable {
 
   /**
    * SparkSession for this Griffin Application.
@@ -69,19 +68,6 @@
     .getBoolean("spark.griffin.measure.cacheDataSources", defaultValue = true)
 
   /**
-   * Size of thread pool for parallel measure execution.
-   * Defaults to number of processors available to the spark driver JVM.
-   */
-  private val numThreads: Int = sparkSession.sparkContext.getConf
-    .getInt("spark.griffin.measure.parallelism", Runtime.getRuntime.availableProcessors())
-
-  /**
-   * Service to handle threaded execution of tasks (measures).
-   */
-  private implicit val ec: ExecutionContextExecutorService =
-    ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numThreads))
-
-  /**
    * Starting point of measure execution.
    *
    * @param measureParams Object representation(s) of user defined measure(s).
@@ -98,20 +84,18 @@
         val dataSourceName = measuresForSource._1
         val measureParams = measuresForSource._2
 
-        withCacheIfNecessary(dataSourceName, {
-          val dataSource = sparkSession.read.table(dataSourceName)
+        val dataSource = sparkSession.read.table(dataSourceName)
 
-          if (dataSource.isStreaming) {
-            // TODO this is a no op as streaming queries need to be registered.
+        if (dataSource.isStreaming) {
+          // TODO this is a no op as streaming queries need to be registered.
 
-            dataSource.writeStream
-              .foreachBatch((_, batchId) => {
-                executeMeasures(measureParams, Some(batchId))
-              })
-          } else {
-            executeMeasures(measureParams)
-          }
-        })
+          dataSource.writeStream
+            .foreachBatch((batchDf: Dataset[Row], batchId: Long) => {
+              executeMeasures(batchDf, dataSourceName, measureParams, batchId)
+            })
+        } else {
+          executeMeasures(dataSource, dataSourceName, measureParams)
+        }
       })
   }
 
@@ -121,25 +105,31 @@
    * After the function is complete, the data source is uncached.
    *
    * @param dataSourceName name of data source
+   * @param numMeasures number of measures for each data source
    * @param f function to perform
-   * @param measureCountByDataSource number of measures for each data source
    * @return
    */
-  private def withCacheIfNecessary(dataSourceName: String, f: => Unit)(
-      implicit measureCountByDataSource: Map[String, Int]): Unit = {
-    val numMeasures = measureCountByDataSource(dataSourceName)
+  private def withCacheIfNecessary(
+      dataSourceName: String,
+      numMeasures: Int,
+      dataSource: DataFrame,
+      f: => Unit): Unit = {
     var isCached = false
     if (cacheDataSources && numMeasures > 1) {
-      info(
-        s"Caching data source with name '$dataSourceName' as $numMeasures measures are applied on it.")
-      sparkSession.catalog.cacheTable(dataSourceName)
-      isCached = true
+      if (!dataSource.isStreaming) {
+        info(
+          s"Caching data source with name '$dataSourceName' as $numMeasures measures are applied on it.")
+        dataSource.persist()
+        isCached = true
+      }
     }
 
     f
 
     if (isCached) {
-      sparkSession.catalog.uncacheTable(dataSourceName)
+      info(
+        s"Un-Caching data source with name '$dataSourceName' as measure execution is complete for it.")
+      dataSource.unpersist(true)
     }
   }
 
@@ -155,49 +145,74 @@
    * @param batchId Option batch Id in case of streaming sources to identify micro batches.
    */
   private def executeMeasures(
+      input: DataFrame,
+      dataSourceName: String,
       measureParams: Seq[MeasureParam],
-      batchId: Option[Long] = None): Unit = {
-    val batchDetailsOpt = batchId.map(bId => s"for batch id $bId").getOrElse(StringUtils.EMPTY)
+      batchId: Long = -1L): Unit = {
+    val numMeasures: Int = measureParams.length
 
-    // define the tasks
-    val tasks: Map[String, Future[_]] = (for (i <- measureParams.indices)
-      yield {
-        val measureParam = measureParams(i)
-        val measureName = measureParam.getName
+    withCacheIfNecessary(
+      dataSourceName,
+      numMeasures,
+      input, {
+        import java.util.concurrent.TimeUnit
 
-        (measureName, Future {
-          val currentContext = context.cloneDQContext(ContextId(new Date().getTime))
-          info(s"Started execution of measure with name '$measureName'")
+        import scala.concurrent.duration.Duration
 
-          val measure = createMeasure(measureParam)
-          val (recordsDf, metricsDf) = measure.execute(batchId)
+        // define the tasks
+        val tasks = (for (i <- measureParams.indices)
+          yield {
+            val measureParam = measureParams(i)
+            val measure = createMeasure(measureParam)
+            val measureName = measureParam.getName
 
-          persistMetrics(currentContext, measure, metricsDf)
-          persistRecords(currentContext, measure, recordsDf)
+            (measure, Future {
+              info(s"Started execution of measure with name '$measureName'")
 
-          MetricFlushStep(Some(measureParam)).execute(currentContext)
+              val (recordsDf, metricsDf) = measure.execute(input)
+              val currentContext = context.cloneDQContext(ContextId(new Date().getTime))
+
+              persistMetrics(currentContext, measure, metricsDf)
+              persistRecords(currentContext, measure, recordsDf)
+
+              MetricFlushStep(Some(measureParam)).execute(currentContext)
+            }(ec))
+          }).toMap
+
+        tasks.foreach(task => {
+          val measureName = task._1.measureParam.getName
+
+          task._2.onComplete {
+            case Success(_) =>
+              info(
+                s"Successfully executed measure with name '$measureName' on data source with name " +
+                  s"'$dataSourceName")
+            case Failure(exception) =>
+              error(
+                s"Error occurred while executing measure with name '$measureName' on data source with name " +
+                  s"'$dataSourceName'",
+                exception)
+          }(ec)
         })
-      }).toMap
 
-    tasks.foreach(task =>
-      task._2.onComplete {
-        case Success(_) =>
-          info(s"Successfully executed measure with name '${task._1}' $batchDetailsOpt")
-        case Failure(e) =>
-          error(s"Error executing measure with name '${task._1}' $batchDetailsOpt", e)
+        var deadline = Duration(10, TimeUnit.SECONDS).fromNow
+
+        while (!tasks.forall(_._2.isCompleted)) {
+          if (deadline.isOverdue()) {
+            val unfinishedMeasureNames = tasks
+              .filterNot(_._2.isCompleted)
+              .map(_._1.measureParam.getName)
+              .mkString("['", "', '", "']")
+
+            info(s"Measures with name $unfinishedMeasureNames are still executing.")
+            deadline = Duration(10, TimeUnit.SECONDS).fromNow
+          }
+        }
+
+        info(
+          "Completed execution of all measures for data source with " +
+            s"name '${measureParams.head.getDataSource}'.")
       })
-
-    Thread.sleep(1000)
-
-    while (!tasks.forall(_._2.isCompleted)) {
-      info(
-        s"Measures with name ${tasks.filterNot(_._2.isCompleted).keys.mkString("['", "', '", "']")} " +
-          s"are still executing.")
-      Thread.sleep(1000)
-    }
-
-    info(
-      s"Completed execution of all measures for data source with name '${measureParams.head.getDataSource}'.")
   }
 
   /**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index d33efc8..d9192f7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -17,16 +17,15 @@
 
 package org.apache.griffin.measure.execution.impl
 
-import java.util.Locale
-
 import io.netty.util.internal.StringUtil
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StringType
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
-import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.CommonUtils.safeReduce
 
 /**
  * Accuracy Measure.
@@ -60,7 +59,7 @@
    * @param sourceCol name of source column
    * @param refCol name of reference column
    */
-  case class AccuracyExpr(sourceCol: String, refCol: String)
+  final case class AccuracyExpr(sourceCol: String, refCol: String)
 
   import AccuracyMeasure._
   import Measure._
@@ -100,42 +99,47 @@
    *
    *  @return tuple of records dataframe and metric dataframe
    */
-  override def impl(): (DataFrame, DataFrame) = {
-    val originalSource = sparkSession.read.table(measureParam.getDataSource)
-    val originalCols = originalSource.columns
+  override def impl(input: DataFrame): (DataFrame, DataFrame) = {
+    val originalCols = input.columns
 
-    val dataSource = addColumnPrefix(originalSource, SourcePrefixStr)
+    val dataSource = addColumnPrefix(input, SourcePrefixStr)
 
     val refDataSource =
-      addColumnPrefix(sparkSession.read.table(refSource).drop(ConstantColumns.tmst), refPrefixStr)
+      addColumnPrefix(sparkSession.read.table(refSource), refPrefixStr)
 
-    val accuracyExprs = exprOpt.get
+    val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+    val accuracyExprs = expr
       .map(toAccuracyExpr)
       .distinct
       .map(x => AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))
 
-    val joinExpr =
+    val joinExpr = safeReduce(
       accuracyExprs
-        .map(e => col(e.sourceCol) === col(e.refCol))
-        .reduce(_ and _)
+        .map(e => col(e.sourceCol) === col(e.refCol)))(_ and _)
 
     val indicatorExpr =
-      accuracyExprs
-        .map(e => coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol))
-        .reduce(_ or _)
+      safeReduce(
+        accuracyExprs
+          .map(e =>
+            coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol)))(
+        _ or _)
 
-    val nullExpr = accuracyExprs.map(e => col(e.sourceCol).isNull).reduce(_ or _)
+    val nullExpr = safeReduce(accuracyExprs.map(e => col(e.sourceCol).isNull))(_ or _)
+
+    val cols = accuracyExprs.map(_.refCol).map(col)
+    val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
 
     val recordsDf = removeColumnPrefix(
       dataSource
-        .join(refDataSource, joinExpr, "left")
+        .join(refDataSource.withColumn(RowNumber, row_number().over(window)), joinExpr, "left")
+        .where(col(RowNumber) === 1 or col(RowNumber).isNull)
         .withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
       SourcePrefixStr)
       .select((originalCols :+ valueColumn).map(col): _*)
 
     val selectCols =
       Seq(Total, AccurateStr, InAccurateStr).map(e =>
-        map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+        map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
     val metricColumn: Column = array(selectCols: _*).as(valueColumn)
 
     val metricDf = recordsDf
@@ -165,8 +169,8 @@
    * Validates if the expression is not null and non empty along with some dataset specific validations.
    */
   override def validate(): Unit = {
-    assert(exprOpt.isDefined, s"'$Expression' must be defined.")
-    assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
+    val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+    assert(expr.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
 
     assert(
       !StringUtil.isNullOrEmpty(refSource),
@@ -178,12 +182,10 @@
 
     val datasourceName = measureParam.getDataSource
 
-    val dataSourceCols =
-      sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
-    val refDataSourceCols =
-      sparkSession.read.table(refSource).columns.map(_.toLowerCase(Locale.ROOT)).toSet
+    val dataSourceCols = sparkSession.read.table(datasourceName).columns.toSet
+    val refDataSourceCols = sparkSession.read.table(refSource).columns.toSet
 
-    val accuracyExpr = exprOpt.get.map(toAccuracyExpr).distinct
+    val accuracyExpr = expr.map(toAccuracyExpr).distinct
     val (forDataSource, forRefDataSource) =
       accuracyExpr
         .map(
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index fbbadc6..fabe166 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -39,15 +39,10 @@
 case class CompletenessMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
+  import CompletenessMeasure._
   import Measure._
 
   /**
-   * Completeness Constants
-   */
-  final val Complete: String = "complete"
-  final val InComplete: String = "incomplete"
-
-  /**
    * Completeness measure supports record and metric write
    */
   override val supportsRecordWrite: Boolean = true
@@ -80,16 +75,15 @@
    *
    *  @return tuple of records dataframe and metric dataframe
    */
-  override def impl(): (DataFrame, DataFrame) = {
-    val exprStr = exprOpt.get
+  override def impl(input: DataFrame): (DataFrame, DataFrame) = {
+    val exprStr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
 
     val selectCols =
       Seq(Total, Complete, InComplete).map(e =>
-        map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+        map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
     val metricColumn: Column = array(selectCols: _*).as(valueColumn)
 
-    val input = sparkSession.read.table(measureParam.getDataSource)
-    val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 1).otherwise(0))
+    val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 0).otherwise(1))
 
     val metricDf = badRecordsDf
       .withColumn(Total, lit(1))
@@ -104,9 +98,16 @@
    * Validates if expression is defined and is non empty.
    */
   override def validate(): Unit = {
-    assert(exprOpt.isDefined, s"'$Expression' must be defined.")
-    assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
-
-    assert(!StringUtil.isNullOrEmpty(exprOpt.get), s"'$Expression' must not be null or empty.")
+    val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+    assert(!StringUtil.isNullOrEmpty(expr), s"'$Expression' must not be null or empty.")
   }
 }
+
+object CompletenessMeasure {
+
+  /**
+   * Completeness Constants
+   */
+  final val Complete: String = "complete"
+  final val InComplete: String = "incomplete"
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index 359a80e..12588cd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -26,6 +26,7 @@
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.utils.CommonUtils.safeReduce
 
 /**
  * Duplication Measure.
@@ -95,38 +96,33 @@
    *
    *  @return tuple of records dataframe and metric dataframe
    */
-  override def impl(): (DataFrame, DataFrame) = {
-    val input = sparkSession.read.table(measureParam.getDataSource)
+  override def impl(input: DataFrame): (DataFrame, DataFrame) = {
     val cols = keyCols(input).map(col)
 
-    val isNullCol = cols.map(x => x.isNull).reduce(_ and _)
-    val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
-    val uniqueCol = when(not(isNullCol) and col(Unique) === 1, 1).otherwise(0)
-    val distinctCol =
-      when(not(isNullCol) and (col(Unique) === 1 or col(NonUnique) === 1), 1).otherwise(0)
-    val nonUniqueCol =
-      when(not(isNullCol) and col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1)
-        .otherwise(0)
+    val isNullCol = safeReduce(cols.map(x => x.isNull))(_ and _)
+    val uniqueCol = condition(col(Count) === 1)
+    val nonUniqueCol = condition(col(Count) > 1 and col(RowNumber) === 1)
+    val duplicateCol = condition(col(Count) > 1 and col(RowNumber) > 1)
+    val distinctCol = condition(col(Unique) === 1 or col(NonUnique) === 1)
 
     val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
 
     val aggDf = input
-      .select(col(AllColumns), row_number().over(window).as(__Temp))
-      .withColumn(IsNull, isNullCol)
-      .withColumn(Duplicate, duplicateCol)
-      .withColumn(Unique, count(lit(1)).over(window))
+      .withColumn(IsNotNull, not(isNullCol))
+      .withColumn(RowNumber, row_number().over(window))
+      .withColumn(Count, count(lit(1)).over(window))
       .withColumn(Unique, uniqueCol)
-      .withColumn(NonUnique, min(__Temp).over(window))
       .withColumn(NonUnique, nonUniqueCol)
+      .withColumn(Duplicate, duplicateCol)
       .withColumn(Distinct, distinctCol)
       .withColumn(Total, lit(1))
       .withColumn(valueColumn, col(badnessExpr))
-      .drop(__Temp, IsNull)
+      .drop(IsNotNull, RowNumber, Count)
 
     val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
 
     val selectCols = duplicationMeasures.map(e =>
-      map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+      map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
     val metricColumn: Column = array(selectCols: _*).as(valueColumn)
 
     val metricDf = aggDf
@@ -160,6 +156,11 @@
     }, s"Invalid value '$badnessExpr' was provided for $BadRecordDefinition")
   }
 
+  private def condition(c: Column, checkNotNull: Boolean = true): Column = {
+    val notNullExpr = if (checkNotNull) col(IsNotNull) else lit(true)
+    when(notNullExpr and c, 1).otherwise(0)
+  }
+
   private def keyCols(input: DataFrame): Array[String] = {
     if (StringUtil.isNullOrEmpty(exprs)) input.columns
     else exprs.split(",").map(_.trim)
@@ -171,10 +172,11 @@
  * Duplication measure constants
  */
 object DuplicationMeasure {
-  final val IsNull: String = "is_null"
   final val Duplicate: String = "duplicate"
   final val Unique: String = "unique"
   final val NonUnique: String = "non_unique"
   final val Distinct: String = "distinct"
-  final val __Temp: String = "__temp"
+
+  final val IsNotNull: String = "is_not_null"
+  final val Count: String = "count"
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
index 52d8bff..fedfe37 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
@@ -17,8 +17,7 @@
 
 package org.apache.griffin.measure.execution.impl
 
-import java.util.Locale
-
+import io.netty.util.internal.StringUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
@@ -26,7 +25,6 @@
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
 import org.apache.griffin.measure.execution.Measure._
-import org.apache.griffin.measure.step.builder.ConstantColumns
 
 /**
  * Profiling measure.
@@ -71,12 +69,12 @@
    * profiling measure is to be executed. `expr` is an optional key for Profiling measure,
    * i.e., if it is not defined, all columns in the data set will be profiled.
    */
-  val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
+  val exprs: String = getFromConfig[String](Expression, null)
 
   /**
-   * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
-   * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
-   * to `false`, then the counts will be 100% accurate.
+   * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
+   * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
+   * floating-point metric values will be rounded to 2 decimal places.
    */
   val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
 
@@ -91,13 +89,15 @@
   val dataSetSample: Double = getFromConfig[java.lang.Double](DataSetSampleStr, 1.0)
 
   /**
-   * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
-   * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
-   * floating-point metric values will be rounded to 2 decimal places.
+   * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
+   * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
+   * to `false`, then the counts will be 100% accurate.
    */
   val approxDistinctCount: Boolean =
     getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
 
+  validate()
+
   /**
    * Various metrics are calculated for columns of the data set. If expr is correctly defined,
    * then metrics are generated for only the given subset of columns else, its generated for all columns.
@@ -118,26 +118,17 @@
    *
    *  @return tuple of records dataframe and metric dataframe
    */
-  override def impl(): (DataFrame, DataFrame) = {
+  override def impl(dataSource: DataFrame): (DataFrame, DataFrame) = {
     info(s"Selecting random ${dataSetSample * 100}% of the rows for profiling.")
-    val input = sparkSession.read.table(measureParam.getDataSource).sample(dataSetSample)
-    val profilingColNames = exprOpt
-      .getOrElse(input.columns.mkString(","))
-      .split(",")
-      .map(_.trim.toLowerCase(Locale.ROOT))
-      .toSet
+    val input = dataSource.sample(dataSetSample)
+    val profilingColNames = keyCols(input)
 
-    val profilingCols =
-      input.schema.fields.filter(f =>
-        profilingColNames.contains(f.name) && !f.name.equalsIgnoreCase(ConstantColumns.tmst))
-
-    assert(
-      profilingCols.nonEmpty,
-      s"Invalid columns [${profilingCols.map(_.name).mkString(", ")}] were provided for profiling.")
+    val profilingCols = input.schema.fields.filter(f => profilingColNames.contains(f.name))
 
     val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
       val colName = field.name
-      val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)
+      val profilingExprs =
+        getProfilingExprs(field, roundScale, approxDistinctCount, dataSetSample).map(nullToZero)
 
       exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
     })
@@ -165,6 +156,24 @@
     (sparkSession.emptyDataFrame, metricDf)
   }
 
+  override def validate(): Unit = {
+    val input = sparkSession.read.table(measureParam.getDataSource)
+    val kc = keyCols(input)
+
+    assert(kc.nonEmpty, s"Columns defined in '$Expression' is empty.")
+    kc.foreach(c =>
+      assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
+
+    assert(
+      dataSetSample > 0.0d && dataSetSample <= 1.0d,
+      "Sample fraction of rows must be in range [0.0, 1.0].")
+  }
+
+  private def keyCols(input: DataFrame): Array[String] = {
+    if (StringUtil.isNullOrEmpty(exprs)) input.columns
+    else exprs.split(",").map(_.trim)
+  }.distinct
+
 }
 
 /**
@@ -227,7 +236,8 @@
   private def getProfilingExprs(
       field: StructField,
       roundScale: Int,
-      approxDistinctCount: Boolean): Seq[Column] = {
+      approxDistinctCount: Boolean,
+      dataSetSample: Double): Seq[Column] = {
     val colName = field.name
     val colType = field.dataType
 
@@ -243,12 +253,16 @@
         (lit(DistinctCount), countDistinct(column).as(DistinctCount))
       }
 
+    val distinctExpr = if (dataSetSample == 1) {
+      Seq(lit(distinctCountName), distinctCountExpr)
+    } else Nil
+
     Seq(
       Seq(lit(DataTypeStr), lit(colType.catalogString).as(DataTypeStr)),
       Seq(lit(Total), sum(lit(1)).as(Total)),
       Seq(lit(MinColLength), min(lengthColExpr).as(MinColLength)),
       Seq(lit(MaxColLength), max(lengthColExpr).as(MaxColLength)),
-      Seq(lit(AvgColLength), forNumericFn(colType, avg(lengthColExpr), AvgColLength)),
+      Seq(lit(AvgColLength), avg(lengthColExpr).as(AvgColLength)),
       Seq(lit(Min), forNumericFn(colType, min(column), Min)),
       Seq(lit(Max), forNumericFn(colType, max(column), Max)),
       Seq(lit(Avg), forNumericFn(colType, bround(avg(column), roundScale), Avg)),
@@ -257,7 +271,7 @@
         forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation)),
       Seq(lit(Variance), forNumericFn(colType, bround(variance(column), roundScale), Variance)),
       Seq(lit(Kurtosis), forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis)),
-      Seq(lit(distinctCountName), distinctCountExpr),
+      distinctExpr,
       Seq(lit(NullCount), sum(nullColExpr).as(NullCount))).flatten
   }
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
index 2111d6a..3247b9b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
@@ -17,14 +17,13 @@
 
 package org.apache.griffin.measure.execution.impl
 
-import java.util.Locale
-
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataType, StringType}
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.utils.CommonUtils.safeReduce
 
 /**
  * SchemaConformance Measure.
@@ -43,6 +42,7 @@
 case class SchemaConformanceMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
+  import CompletenessMeasure._
   import Measure._
 
   /**
@@ -51,15 +51,13 @@
    * @param sourceCol name of source column
    * @param dataType name of reference column
    */
-  case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
+  final case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
 
   /**
    * SchemaConformance Constants
    */
   final val SourceColStr: String = "source.col"
   final val DataTypeStr: String = "type"
-  final val Complete: String = "complete"
-  final val InComplete: String = "incomplete"
 
   /**
    * SchemaConformance measure supports record and metric write
@@ -90,19 +88,21 @@
    *
    *  @return tuple of records dataframe and metric dataframe
    */
-  override def impl(): (DataFrame, DataFrame) = {
-    val givenExprs = exprOpt.get.map(toSchemaConformanceExpr).distinct
+  override def impl(input: DataFrame): (DataFrame, DataFrame) = {
+    val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+    val givenExprs = expr.map(toSchemaConformanceExpr).distinct
 
-    val incompleteExpr = givenExprs
-      .map(e => when(col(e.sourceCol).cast(e.dataType).isNull, true).otherwise(false))
-      .reduce(_ or _)
+    val incompleteExpr = safeReduce(
+      givenExprs
+        .map(e =>
+          when(col(e.sourceCol).cast(StringType).cast(e.dataType).isNull, true)
+            .otherwise(false)))(_ or _)
 
     val selectCols =
       Seq(Total, Complete, InComplete).map(e =>
-        map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+        map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
     val metricColumn: Column = array(selectCols: _*).as(valueColumn)
 
-    val input = sparkSession.read.table(measureParam.getDataSource)
     val badRecordsDf = input.withColumn(valueColumn, when(incompleteExpr, 1).otherwise(0))
 
     val metricDf = badRecordsDf
@@ -132,14 +132,13 @@
    * Validates if the expression is not null and non empty along with some dataset specific validations.
    */
   override def validate(): Unit = {
-    assert(exprOpt.isDefined, s"'$Expression' must be defined.")
-    assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
+    val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+    assert(expr.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
 
     val datasourceName = measureParam.getDataSource
 
-    val dataSourceCols =
-      sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
-    val schemaConformanceExprExpr = exprOpt.get.map(toSchemaConformanceExpr).distinct
+    val dataSourceCols = sparkSession.read.table(datasourceName).columns.toSet
+    val schemaConformanceExprExpr = expr.map(toSchemaConformanceExpr).distinct
 
     val forDataSource =
       schemaConformanceExprExpr.map(e => (e.sourceCol, dataSourceCols.contains(e.sourceCol)))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
index 0cdc090..2e79257 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
@@ -43,15 +43,10 @@
 case class SparkSQLMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
     extends Measure {
 
+  import CompletenessMeasure._
   import Measure._
 
   /**
-   * SparkSQL measure constants
-   */
-  final val Complete: String = "complete"
-  final val InComplete: String = "incomplete"
-
-  /**
    * SparkSQL measure supports record and metric write
    */
   override val supportsRecordWrite: Boolean = true
@@ -87,8 +82,8 @@
    *
    *  @return tuple of records dataframe and metric dataframe
    */
-  override def impl(): (DataFrame, DataFrame) = {
-    val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
+  override def impl(dataSource: DataFrame): (DataFrame, DataFrame) = {
+    val df = dataSource.sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
 
     assert(
       df.schema.exists(f => f.name.matches(valueColumn) && f.dataType.isInstanceOf[BooleanType]),
@@ -96,7 +91,7 @@
 
     val selectCols =
       Seq(Total, Complete, InComplete).map(e =>
-        map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+        map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
     val metricColumn: Column = array(selectCols: _*).as(valueColumn)
 
     val badRecordsDf = df.withColumn(valueColumn, when(col(valueColumn), 1).otherwise(0))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 48231ac..6d6200a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -17,8 +17,9 @@
 
 package org.apache.griffin.measure.launch.batch
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Executors, TimeUnit}
 
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
 import scala.util.Try
 
 import org.apache.spark.SparkConf
@@ -85,7 +86,18 @@
 
           if (dqParam.getMeasures != null && dqParam.getMeasures.nonEmpty) {
             Try {
-              MeasureExecutor(dqContext).execute(dqParam.getMeasures)
+              // Size of thread pool for parallel measure execution.
+              // Defaults to number of processors available to the spark driver JVM.
+              val numThreads: Int = sparkSession.sparkContext.getConf
+                .getInt(
+                  "spark.griffin.measure.parallelism",
+                  Runtime.getRuntime.availableProcessors())
+
+              // Service to handle threaded execution of tasks (measures).
+              val ec: ExecutionContextExecutorService =
+                ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numThreads))
+
+              MeasureExecutor(dqContext, ec).execute(dqParam.getMeasures)
               true
             }
           } else {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
index 268b3a0..08bfb9b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
@@ -41,4 +41,17 @@
 
     result
   }
+
+  /**
+   *
+   * @param t `Traversable`
+   * @param op operation
+   * @tparam T1 any type
+   * @return instance of `T1`
+   */
+  def safeReduce[T1](t: Iterable[T1])(op: (T1, T1) => T1): T1 = {
+    val head :: tail = t.toList
+    tail.fold(head)(op)
+  }
+
 }
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index 2a7f6a4..9f0f748 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -21,7 +21,7 @@
       "type": "completeness",
       "data.source": "source",
       "config": {
-        "expr": "email is null or post_code is null or first_name is null"
+        "expr": "email is not null and post_code is not null and first_name is not null"
       },
       "out": [
         {
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
index 2f81d1c..206761f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
@@ -136,7 +136,7 @@
 
   it should "execute defined measure expr" in {
     val measure = AccuracyMeasure(spark, param)
-    val (recordsDf, metricsDf) = measure.execute(None)
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(recordDfSchema)(recordsDf.schema)
     assertResult(metricDfSchema)(metricsDf.schema)
@@ -155,4 +155,27 @@
     assertResult(metricMap(InAccurateStr))("3")
   }
 
+  it should "support space in column name" in {
+    val newSource = source.withColumnRenamed("name", "first name")
+    newSource.createOrReplaceTempView("newSource")
+    val measure = AccuracyMeasure(spark, param)
+    val (recordsDf, metricsDf) = measure.execute(newSource)
+
+    assertResult(newSource.schema.add(Status, "string", nullable = false))(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
+
+    assertResult(newSource.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
+
+    val metricMap = metricsDf
+      .head()
+      .getAs[Seq[Map[String, String]]](Metrics)
+      .map(x => x(MetricName) -> x(MetricValue))
+      .toMap
+
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(AccurateStr))("2")
+    assertResult(metricMap(InAccurateStr))("3")
+  }
+
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
index d6e8b74..db0127c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -23,13 +23,14 @@
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.CompletenessMeasure._
 
 class CompletenessMeasureTest extends MeasureTest {
   var param: MeasureParam = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    param = MeasureParam("param", "Completeness", "source", Map(Expression -> "name is null"))
+    param = MeasureParam("param", "Completeness", "source", Map(Expression -> "name is not null"))
   }
 
   "CompletenessMeasure" should "validate expression config" in {
@@ -61,8 +62,8 @@
   }
 
   it should "execute defined measure expr" in {
-    val measure = CompletenessMeasure(spark, param)
-    val (recordsDf, metricsDf) = measure.execute(None)
+    val measure = new CompletenessMeasure(spark, param)
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(recordDfSchema)(recordsDf.schema)
     assertResult(metricDfSchema)(metricsDf.schema)
@@ -77,15 +78,15 @@
       .toMap
 
     assertResult(metricMap(Total))("5")
-    assertResult(metricMap(measure.Complete))("4")
-    assertResult(metricMap(measure.InComplete))("1")
+    assertResult(metricMap(Complete))("4")
+    assertResult(metricMap(InComplete))("1")
   }
 
   it should "supported complex measure expr" in {
-    val measure = CompletenessMeasure(
+    val measure = new CompletenessMeasure(
       spark,
-      param.copy(config = Map(Expression -> "name is null or gender is null")))
-    val (recordsDf, metricsDf) = measure.execute(None)
+      param.copy(config = Map(Expression -> "name is not null and gender is not null")))
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(recordDfSchema)(recordsDf.schema)
     assertResult(metricDfSchema)(metricsDf.schema)
@@ -100,19 +101,19 @@
       .toMap
 
     assertResult(metricMap(Total))("5")
-    assertResult(metricMap(measure.Complete))("3")
-    assertResult(metricMap(measure.InComplete))("2")
+    assertResult(metricMap(Complete))("3")
+    assertResult(metricMap(InComplete))("2")
   }
 
   it should "throw runtime error for invalid expr" in {
     assertThrows[AnalysisException] {
-      CompletenessMeasure(spark, param.copy(config = Map(Expression -> "xyz is null")))
-        .execute()
+      new CompletenessMeasure(spark, param.copy(config = Map(Expression -> "xyz is null")))
+        .execute(source)
     }
 
     assertThrows[ParseException] {
-      CompletenessMeasure(spark, param.copy(config = Map(Expression -> "select 1")))
-        .execute()
+      new CompletenessMeasure(spark, param.copy(config = Map(Expression -> "select 1")))
+        .execute(source)
     }
   }
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index 7e22617..f807434 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -107,7 +107,7 @@
   it should "execute defined measure expr" in {
     val measure =
       DuplicationMeasure(spark, param.copy(config = Map(BadRecordDefinition -> "duplicate")))
-    val (recordsDf, metricsDf) = measure.execute(None)
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(recordDfSchema)(recordsDf.schema)
     assertResult(metricDfSchema)(metricsDf.schema)
@@ -128,11 +128,11 @@
     assertResult(metricMap(Total))("5")
   }
 
-  it should "supported complex measure expr" in {
+  it should "support complex measure expr" in {
     val measure = DuplicationMeasure(
       spark,
       param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "duplicate")))
-    val (recordsDf, metricsDf) = measure.execute(None)
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(recordDfSchema)(recordsDf.schema)
     assertResult(metricDfSchema)(metricsDf.schema)
@@ -153,4 +153,34 @@
     assertResult(metricMap(Total))("5")
   }
 
+  it should "support columns with space in their names" in {
+    val newSource = source.withColumnRenamed("name", "full name")
+    newSource.createOrReplaceTempView("newSource")
+    val measure = DuplicationMeasure(
+      spark,
+      param.copy(
+        dataSource = "newSource",
+        config = Map(Expression -> "full name", BadRecordDefinition -> "duplicate")))
+
+    val (recordsDf, metricsDf) = measure.execute(newSource)
+
+    assertResult(newSource.schema.add(Status, "string", nullable = false))(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
+
+    assertResult(newSource.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
+
+    val metricMap = metricsDf
+      .head()
+      .getAs[Seq[Map[String, String]]](Metrics)
+      .map(x => x(MetricName) -> x(MetricValue))
+      .toMap
+
+    assertResult(metricMap(Duplicate))("1")
+    assertResult(metricMap(Unique))("2")
+    assertResult(metricMap(NonUnique))("1")
+    assertResult(metricMap(Distinct))("3")
+    assertResult(metricMap(Total))("5")
+  }
+
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
index e4f1b21..d32730f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
@@ -70,7 +70,7 @@
     assertResult(3)(measure.roundScale)
     assertResult(true)(measure.approxDistinctCount)
 
-    val (_, metricsDf) = measure.execute(None)
+    val (_, metricsDf) = measure.execute(source)
 
     assertResult(1L)(metricsDf.count())
 
@@ -86,7 +86,7 @@
     assertResult(3)(measure.roundScale)
     assertResult(false)(measure.approxDistinctCount)
 
-    val (_, metricsDf) = measure.execute(None)
+    val (_, metricsDf) = measure.execute(source)
 
     assertResult(1L)(metricsDf.count())
 
@@ -97,7 +97,7 @@
   it should "throw runtime error for invalid expr" in {
     assertThrows[AssertionError] {
       ProfilingMeasure(spark, param.copy(config = Map(Expression -> "xyz")))
-        .execute()
+        .execute(source)
     }
   }
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
index e705675..6c37f34 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
@@ -18,9 +18,11 @@
 package org.apache.griffin.measure.execution.impl
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.functions.lit
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.CompletenessMeasure._
 
 class SchemaConformanceMeasureTest extends MeasureTest {
   var param: MeasureParam = _
@@ -102,7 +104,7 @@
 
   it should "execute defined measure expr" in {
     val measure = SchemaConformanceMeasure(spark, param)
-    val (recordsDf, metricsDf) = measure.execute(None)
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(recordDfSchema)(recordsDf.schema)
     assertResult(metricDfSchema)(metricsDf.schema)
@@ -117,8 +119,65 @@
       .toMap
 
     assertResult(metricMap(Total))("5")
-    assertResult(metricMap(measure.Complete))("5")
-    assertResult(metricMap(measure.InComplete))("0")
+    assertResult(metricMap(Complete))("5")
+    assertResult(metricMap(InComplete))("0")
+  }
+
+  it should "be able to check for date column" in {
+    val newSource = source.withColumn("Test_date", lit("2009-07-30"))
+    newSource.createOrReplaceTempView("newSource")
+    val measure = SchemaConformanceMeasure(
+      spark,
+      param.copy(
+        dataSource = "newSource",
+        config = Map(Expression -> Seq(Map(SourceColStr -> "Test_date", DataTypeStr -> "date")))))
+    val (recordsDf, metricsDf) = measure.execute(newSource)
+
+    val newRecordDfSchema = newSource.schema.add(Status, "string", nullable = false)
+    assertResult(newRecordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
+
+    assertResult(newSource.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
+
+    val metricMap = metricsDf
+      .head()
+      .getAs[Seq[Map[String, String]]](Metrics)
+      .map(x => x(MetricName) -> x(MetricValue))
+      .toMap
+
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(Complete))("5")
+    assertResult(metricMap(InComplete))("0")
+  }
+
+  it should "be able to check for timestamp column and support space in column name" in {
+    val newSource = source.withColumn("Test date", lit("2009-07-30"))
+    newSource.createOrReplaceTempView("newSource")
+    val measure = SchemaConformanceMeasure(
+      spark,
+      param.copy(
+        dataSource = "newSource",
+        config =
+          Map(Expression -> Seq(Map(SourceColStr -> "Test date", DataTypeStr -> "timestamp")))))
+    val (recordsDf, metricsDf) = measure.execute(newSource)
+
+    val newRecordDfSchema = newSource.schema.add(Status, "string", nullable = false)
+    assertResult(newRecordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
+
+    assertResult(newSource.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
+
+    val metricMap = metricsDf
+      .head()
+      .getAs[Seq[Map[String, String]]](Metrics)
+      .map(x => x(MetricName) -> x(MetricValue))
+      .toMap
+
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(Complete))("5")
+    assertResult(metricMap(InComplete))("0")
   }
 
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
index 405e4a6..a2986e0 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
@@ -22,6 +22,7 @@
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.CompletenessMeasure._
 
 class SparkSqlMeasureTest extends MeasureTest {
   var param: MeasureParam = _
@@ -99,7 +100,7 @@
 
   it should "execute defined measure expr" in {
     val measure = SparkSQLMeasure(spark, param)
-    val (recordsDf, metricsDf) = measure.execute(None)
+    val (recordsDf, metricsDf) = measure.execute(source)
 
     assertResult(metricDfSchema)(metricsDf.schema)
     assertResult(source.count())(recordsDf.count())
@@ -112,8 +113,8 @@
       .toMap
 
     assertResult(metricMap(Total))("5")
-    assertResult(metricMap(measure.Complete))("3")
-    assertResult(metricMap(measure.InComplete))("2")
+    assertResult(metricMap(Complete))("3")
+    assertResult(metricMap(InComplete))("2")
   }
 
   it should "throw runtime error for invalid expr" in {
@@ -122,7 +123,7 @@
         spark,
         param.copy(config =
           Map(Expression -> "select * from source", BadRecordDefinition -> "name")))
-        .execute()
+        .execute(source)
     }
 
     assertThrows[AnalysisException] {
@@ -130,7 +131,7 @@
         spark,
         param.copy(config =
           Map(Expression -> "select 1 as my_value", BadRecordDefinition -> "name is null")))
-        .execute()
+        .execute(source)
     }
   }
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index b4f11cd..c7d002b 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -147,18 +147,18 @@
         "max" -> "10013",
         "min_col_len" -> "5"),
       "first_name" -> Map(
-        "avg_col_len" -> null,
+        "avg_col_len" -> "6.0",
         "max_col_len" -> "6",
-        "variance" -> null,
-        "kurtosis" -> null,
-        "avg" -> null,
-        "min" -> null,
+        "variance" -> "0",
+        "kurtosis" -> "0",
+        "avg" -> "0",
+        "min" -> "0",
         "null_count" -> "0",
         "approx_distinct_count" -> "13",
         "total" -> "13",
-        "std_dev" -> null,
+        "std_dev" -> "0",
         "data_type" -> "string",
-        "max" -> null,
+        "max" -> "0",
         "min_col_len" -> "6"))
 
     runAndCheckResult(Map("profiling_measure" -> expectedMetrics))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index e24bcd5..5e374c9 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -17,7 +17,7 @@
 
 package org.apache.griffin.measure.job
 
-import scala.util.{Failure, Success}
+import scala.util._
 
 import org.scalatest.matchers.should._
 
@@ -38,11 +38,9 @@
   var dqApp: DQApp = _
 
   def getConfigFilePath(fileName: String): String = {
-    try {
-      getClass.getResource(fileName).getFile
-    } catch {
-      case _: NullPointerException => throw new Exception(s"resource [$fileName] not found")
-      case ex: Throwable => throw ex
+    Try(getClass.getResource(fileName).getFile).toOption match {
+      case Some(n) => n
+      case None => throw new Exception(s"resource [$fileName] not found")
     }
   }
 
diff --git a/pom.xml b/pom.xml
index e51c94e..d2d4ffe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
         <java.version>1.8</java.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala211.binary.version>2.11</scala211.binary.version>
-        <scala.version>${scala.binary.version}.0</scala.version>
+        <scala.version>${scala.binary.version}.12</scala.version>
 
         <maven.compiler.source>${java.version}</maven.compiler.source>
         <maven.compiler.target>${java.version}</maven.compiler.target>