[GRIFFIN-358] Added CompletenessMeasureTest
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
index 79ba424..c387aaa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
@@ -17,8 +17,6 @@
 
 package org.apache.griffin.measure.execution
 
-import java.util.Locale
-
 import scala.reflect.ClassTag
 
 import org.apache.commons.lang3.StringUtils
@@ -28,6 +26,7 @@
 import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
 import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.DQContext
 
 trait Measure extends Loggable {
   import Measure._
@@ -46,11 +45,9 @@
   // todo add status col to persist blank metrics if the measure fails
   def preProcessMetrics(input: DataFrame): DataFrame = {
     if (supportsMetricWrite) {
-      val measureType = measureParam.getType.toString.toLowerCase(Locale.ROOT)
-
       input
         .withColumn(MeasureName, typedLit[String](measureParam.getName))
-        .withColumn(MeasureType, typedLit[String](measureType))
+        .withColumn(MeasureType, typedLit[String](measureParam.getType.toString))
         .withColumn(Metrics, col(valueColumn))
         .withColumn(DataSource, typedLit[String](measureParam.getDataSource))
         .select(MeasureName, MeasureType, DataSource, Metrics)
@@ -67,8 +64,8 @@
 
   def impl(sparkSession: SparkSession): (DataFrame, DataFrame)
 
-  def execute(sparkSession: SparkSession, batchId: Option[Long]): (DataFrame, DataFrame) = {
-    val (recordsDf, metricDf) = impl(sparkSession)
+  def execute(context: DQContext, batchId: Option[Long] = None): (DataFrame, DataFrame) = {
+    val (recordsDf, metricDf) = impl(context.sparkSession)
 
     val processedRecordDf = preProcessRecords(recordsDf)
     val processedMetricDf = preProcessMetrics(metricDf)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index 55bcff3..f5d939a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -70,7 +70,7 @@
       batchId: Option[Long] = None): Unit = {
     measureParams.foreach(measureParam => {
       val measure = createMeasure(measureParam)
-      val (recordsDf, metricsDf) = measure.execute(context.sparkSession, batchId)
+      val (recordsDf, metricsDf) = measure.execute(context, batchId)
 
       persistRecords(measure, recordsDf)
       persistMetrics(measure, metricsDf)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index 3e123d7..4ea5aa4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.griffin.measure.execution.impl
 
+import io.netty.util.internal.StringUtil
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.sql.functions._
 
@@ -27,31 +28,26 @@
 
   import Measure._
 
-  private final val Complete: String = "complete"
-  private final val InComplete: String = "incomplete"
+  final val Complete: String = "complete"
+  final val InComplete: String = "incomplete"
 
   override val supportsRecordWrite: Boolean = true
 
   override val supportsMetricWrite: Boolean = true
 
+  val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
+
+  validate()
+
   override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
-    val exprOpt = Option(getFromConfig[String](Expression, null))
+    val exprStr = exprOpt.get
 
-    val column = exprOpt match {
-      case Some(exprStr) => when(expr(exprStr), 1.0).otherwise(0.0)
-      case None =>
-        error(
-          s"$Expression was not defined for completeness measure.",
-          new IllegalArgumentException(s"$Expression was not defined for completeness measure."))
-        throw new IllegalArgumentException(
-          s"$Expression was not defined for completeness measures")
-    }
-
-    val selectCols = Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e)))
+    val selectCols =
+      Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string")))
     val metricColumn: Column = map(selectCols: _*).as(valueColumn)
 
     val input = sparkSession.read.table(measureParam.getDataSource)
-    val badRecordsDf = input.withColumn(valueColumn, column)
+    val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 1).otherwise(0))
 
     val metricDf = badRecordsDf
       .withColumn(Total, lit(1))
@@ -61,4 +57,11 @@
 
     (badRecordsDf, metricDf)
   }
+
+  private def validate(): Unit = {
+    assert(exprOpt.isDefined, s"'$Expression' must be defined.")
+    assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
+
+    assert(!StringUtil.isNullOrEmpty(exprOpt.get), s"'$Expression' must not be null or empty.")
+  }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
new file mode 100644
index 0000000..1242405
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.AnalysisException
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+
+class CompletenessMeasureTest extends MeasureTest {
+  var param: MeasureParam = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    param = MeasureParam("param", "Completeness", "source", Map(Expression -> "name is null"))
+  }
+
+  "CompletenessMeasure" should "validate expression config" in {
+    assertThrows[AssertionError] {
+      CompletenessMeasure(param.copy(config = Map.empty[String, String]))
+    }
+
+    assertThrows[AssertionError] {
+      CompletenessMeasure(param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+    }
+
+    assertThrows[AssertionError] {
+      CompletenessMeasure(param.copy(config = Map(Expression -> null)))
+    }
+  }
+
+  it should "support metric writing" in {
+    val measure = CompletenessMeasure(param)
+    assertResult(true)(measure.supportsMetricWrite)
+  }
+
+  it should "support record writing" in {
+    val measure = CompletenessMeasure(param)
+    assertResult(true)(measure.supportsRecordWrite)
+  }
+
+  it should "execute defined measure expr" in {
+    val measure = CompletenessMeasure(param)
+    val (recordsDf, metricsDf) = measure.execute(context, None)
+
+    assertResult(recordsDf.schema)(recordDfSchema)
+    assertResult(metricsDf.schema)(metricDfSchema)
+
+    assertResult(recordsDf.count())(dataSet.count())
+    assertResult(metricsDf.count())(1L)
+
+    val row = metricsDf.head()
+    assertResult(param.getDataSource)(row.getAs[String](DataSource))
+    assertResult(param.getName)(row.getAs[String](MeasureName))
+    assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+    val metricMap = row.getAs[Map[String, String]](Metrics)
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(measure.Complete))("4")
+    assertResult(metricMap(measure.InComplete))("1")
+  }
+
+  it should "supported complex measure expr" in {
+    val measure = CompletenessMeasure(
+      param.copy(config = Map(Expression -> "name is null or gender is null")))
+    val (recordsDf, metricsDf) = measure.execute(context, None)
+
+    assertResult(recordsDf.schema)(recordDfSchema)
+    assertResult(metricsDf.schema)(metricDfSchema)
+
+    assertResult(recordsDf.count())(dataSet.count())
+    assertResult(metricsDf.count())(1L)
+
+    val row = metricsDf.head()
+    assertResult(param.getDataSource)(row.getAs[String](DataSource))
+    assertResult(param.getName)(row.getAs[String](MeasureName))
+    assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+    val metricMap = row.getAs[Map[String, String]](Metrics)
+    assertResult(metricMap(Total))("5")
+    assertResult(metricMap(measure.Complete))("3")
+    assertResult(metricMap(measure.InComplete))("2")
+  }
+
+  it should "throw runtime error for invalid expr" in {
+    assertThrows[AnalysisException] {
+      CompletenessMeasure(param.copy(config = Map(Expression -> "xyz is null")))
+        .execute(context)
+    }
+
+  }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/MeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/MeasureTest.scala
new file mode 100644
index 0000000..eae90e8
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/MeasureTest.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.{MapType, StringType, StructType}
+import org.scalatest.matchers.should._
+
+import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+
+trait MeasureTest extends SparkSuiteBase with Matchers {
+
+  var sourceSchema: StructType = _
+  var recordDfSchema: StructType = _
+  var metricDfSchema: StructType = _
+  var context: DQContext = _
+
+  var dataSet: DataFrame = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    context =
+      DQContext(ContextId(System.currentTimeMillis), "test-context", Nil, Nil, BatchProcessType)(
+        spark)
+
+    sourceSchema =
+      new StructType().add("id", "integer").add("name", "string").add("gender", "string")
+
+    recordDfSchema = sourceSchema.add(Status, "string", nullable = false)
+    metricDfSchema = new StructType()
+      .add(MeasureName, "string", nullable = false)
+      .add(MeasureType, "string", nullable = false)
+      .add(DataSource, "string", nullable = false)
+      .add(Metrics, MapType(StringType, StringType), nullable = false)
+
+    dataSet = spark
+      .createDataset(
+        Seq(
+          Row(1, "John Smith", "Male"),
+          Row(2, "John Smith", null),
+          Row(3, "Rebecca Davis", "Female"),
+          Row(4, "Paul Adams", "Male"),
+          Row(5, null, null)))(RowEncoder(sourceSchema))
+      .cache()
+
+    dataSet.createOrReplaceTempView("source")
+  }
+
+}