[GRIFFIN-358] Added SparkSqlMeasureTest
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
index fec0d02..b32aa42 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
@@ -13,31 +13,27 @@
 
   import Measure._
 
-  private final val Complete: String = "complete"
-  private final val InComplete: String = "incomplete"
+  final val Complete: String = "complete"
+  final val InComplete: String = "incomplete"
 
   override val supportsRecordWrite: Boolean = true
 
   override val supportsMetricWrite: Boolean = true
 
+  private val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
   private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
 
+  validate()
+
   override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
-
-    val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
-
-    assert(!StringUtil.isNullOrEmpty(expr), "Invalid query provided as expr.")
-    assert(
-      !StringUtil.isNullOrEmpty(badnessExpr),
-      "Invalid condition provided as bad.record.definition.")
-
     val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
 
     assert(
       df.schema.exists(f => f.name.matches(valueColumn) && f.dataType.isInstanceOf[BooleanType]),
       s"Invalid condition provided as $BadRecordDefinition. Does not yield a boolean result.")
 
-    val selectCols = Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e)))
+    val selectCols =
+      Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string")))
     val metricColumn: Column = map(selectCols: _*).as(valueColumn)
 
     val badRecordsDf = df.withColumn(valueColumn, when(col(valueColumn), 1).otherwise(0))
@@ -49,4 +45,13 @@
 
     (badRecordsDf, metricDf)
   }
+
+  private def validate(): Unit = {
+    assert(
+      !StringUtil.isNullOrEmpty(expr),
+      "Invalid query provided as expr. Must not be null, empty or of invalid type.")
+    assert(
+      !StringUtil.isNullOrEmpty(badnessExpr),
+      "Invalid condition provided as bad.record.definition.")
+  }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
index baec723..492bd1b 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -19,6 +19,7 @@
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.parser.ParseException
 
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.execution.Measure._
@@ -108,6 +109,10 @@
         .execute(context)
     }
 
+    assertThrows[ParseException] {
+      CompletenessMeasure(param.copy(config = Map(Expression -> "select 1")))
+        .execute(context)
+    }
   }
 
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
new file mode 100644
index 0000000..eed4b46
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.AnalysisException
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+
+class SparkSqlMeasureTest extends MeasureTest {
+  var param: MeasureParam = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    param = MeasureParam(
+      "param",
+      "SparkSql",
+      "source",
+      Map(
+        Expression -> "select * from source",
+        BadRecordDefinition -> "name is null or gender is null"))
+  }
+
+  "SparkSqlMeasure" should "validate expression config" in {
+
+    // Validations for Expression
+
+    // Empty
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(param.copy(config = Map.empty[String, String]))
+    }
+
+    // Empty
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+    }
+
+    // Null
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(param.copy(config = Map(Expression -> null)))
+    }
+
+    // Incorrect Type
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(param.copy(config = Map(Expression -> 943)))
+    }
+
+    // Validations for BadRecordDefinition
+
+    // Empty
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(
+        param.copy(
+          config = Map(Expression -> "select 1", BadRecordDefinition -> StringUtils.EMPTY)))
+    }
+
+    // Incorrect Type
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(
+        param.copy(config = Map(Expression -> "select 1", BadRecordDefinition -> 2344)))
+    }
+
+    // Null
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(
+        param.copy(config = Map(Expression -> "select 1", BadRecordDefinition -> null)))
+    }
+
+  }
+
+  it should "support metric writing" in {
+    val measure = SparkSQLMeasure(param)
+    assertResult(true)(measure.supportsMetricWrite)
+  }
+
+  it should "support record writing" in {
+    val measure = SparkSQLMeasure(param)
+    assertResult(true)(measure.supportsRecordWrite)
+  }
+
+  it should "execute defined measure expr" in {
+    val measure = SparkSQLMeasure(param)
+    val (recordsDf, metricsDf) = measure.execute(context, None)
+
+    assertResult(metricsDf.schema)(metricDfSchema)
+
+    assertResult(recordsDf.count())(source.count())
+    assertResult(metricsDf.count())(1L)
+
+    val row = metricsDf.head()
+    assertResult(param.getDataSource)(row.getAs[String](DataSource))
+    assertResult(param.getName)(row.getAs[String](MeasureName))
+    assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+    val metricMap = row.getAs[Map[String, String]](Metrics)
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(measure.Complete))("3")
+    assertResult(metricMap(measure.InComplete))("2")
+  }
+
+  it should "throw runtime error for invalid expr" in {
+    assertThrows[AssertionError] {
+      SparkSQLMeasure(
+        param.copy(
+          config = Map(Expression -> "select * from source", BadRecordDefinition -> "name")))
+        .execute(context)
+    }
+
+    assertThrows[AnalysisException] {
+      SparkSQLMeasure(
+        param.copy(config =
+          Map(Expression -> "select 1 as my_value", BadRecordDefinition -> "name is null")))
+        .execute(context)
+    }
+  }
+
+}