[GRIFFIN-358] Added CompletenessMeasureTest
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
index 79ba424..c387aaa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
@@ -17,8 +17,6 @@
package org.apache.griffin.measure.execution
-import java.util.Locale
-
import scala.reflect.ClassTag
import org.apache.commons.lang3.StringUtils
@@ -28,6 +26,7 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.utils.ParamUtil._
import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.DQContext
trait Measure extends Loggable {
import Measure._
@@ -46,11 +45,9 @@
// todo add status col to persist blank metrics if the measure fails
def preProcessMetrics(input: DataFrame): DataFrame = {
if (supportsMetricWrite) {
- val measureType = measureParam.getType.toString.toLowerCase(Locale.ROOT)
-
input
.withColumn(MeasureName, typedLit[String](measureParam.getName))
- .withColumn(MeasureType, typedLit[String](measureType))
+ .withColumn(MeasureType, typedLit[String](measureParam.getType.toString))
.withColumn(Metrics, col(valueColumn))
.withColumn(DataSource, typedLit[String](measureParam.getDataSource))
.select(MeasureName, MeasureType, DataSource, Metrics)
@@ -67,8 +64,8 @@
def impl(sparkSession: SparkSession): (DataFrame, DataFrame)
- def execute(sparkSession: SparkSession, batchId: Option[Long]): (DataFrame, DataFrame) = {
- val (recordsDf, metricDf) = impl(sparkSession)
+ def execute(context: DQContext, batchId: Option[Long] = None): (DataFrame, DataFrame) = {
+ val (recordsDf, metricDf) = impl(context.sparkSession)
val processedRecordDf = preProcessRecords(recordsDf)
val processedMetricDf = preProcessMetrics(metricDf)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index 55bcff3..f5d939a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -70,7 +70,7 @@
batchId: Option[Long] = None): Unit = {
measureParams.foreach(measureParam => {
val measure = createMeasure(measureParam)
- val (recordsDf, metricsDf) = measure.execute(context.sparkSession, batchId)
+ val (recordsDf, metricsDf) = measure.execute(context, batchId)
persistRecords(measure, recordsDf)
persistMetrics(measure, metricsDf)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index 3e123d7..4ea5aa4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -17,6 +17,7 @@
package org.apache.griffin.measure.execution.impl
+import io.netty.util.internal.StringUtil
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
@@ -27,31 +28,26 @@
import Measure._
- private final val Complete: String = "complete"
- private final val InComplete: String = "incomplete"
+ final val Complete: String = "complete"
+ final val InComplete: String = "incomplete"
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
+ val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
+
+ validate()
+
override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
- val exprOpt = Option(getFromConfig[String](Expression, null))
+ val exprStr = exprOpt.get
- val column = exprOpt match {
- case Some(exprStr) => when(expr(exprStr), 1.0).otherwise(0.0)
- case None =>
- error(
- s"$Expression was not defined for completeness measure.",
- new IllegalArgumentException(s"$Expression was not defined for completeness measure."))
- throw new IllegalArgumentException(
- s"$Expression was not defined for completeness measures")
- }
-
- val selectCols = Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e)))
+ val selectCols =
+ Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn: Column = map(selectCols: _*).as(valueColumn)
val input = sparkSession.read.table(measureParam.getDataSource)
- val badRecordsDf = input.withColumn(valueColumn, column)
+ val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 1).otherwise(0))
val metricDf = badRecordsDf
.withColumn(Total, lit(1))
@@ -61,4 +57,11 @@
(badRecordsDf, metricDf)
}
+
+ private def validate(): Unit = {
+ assert(exprOpt.isDefined, s"'$Expression' must be defined.")
+ assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
+
+ assert(!StringUtil.isNullOrEmpty(exprOpt.get), s"'$Expression' must not be null or empty.")
+ }
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
new file mode 100644
index 0000000..1242405
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.AnalysisException
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+
+class CompletenessMeasureTest extends MeasureTest {
+ var param: MeasureParam = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ param = MeasureParam("param", "Completeness", "source", Map(Expression -> "name is null"))
+ }
+
+ "CompletenessMeasure" should "validate expression config" in {
+ assertThrows[AssertionError] {
+ CompletenessMeasure(param.copy(config = Map.empty[String, String]))
+ }
+
+ assertThrows[AssertionError] {
+ CompletenessMeasure(param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+ }
+
+ assertThrows[AssertionError] {
+ CompletenessMeasure(param.copy(config = Map(Expression -> null)))
+ }
+ }
+
+ it should "support metric writing" in {
+ val measure = CompletenessMeasure(param)
+ assertResult(true)(measure.supportsMetricWrite)
+ }
+
+ it should "support record writing" in {
+ val measure = CompletenessMeasure(param)
+ assertResult(true)(measure.supportsRecordWrite)
+ }
+
+ it should "execute defined measure expr" in {
+ val measure = CompletenessMeasure(param)
+ val (recordsDf, metricsDf) = measure.execute(context, None)
+
+ assertResult(recordsDf.schema)(recordDfSchema)
+ assertResult(metricsDf.schema)(metricDfSchema)
+
+ assertResult(recordsDf.count())(dataSet.count())
+ assertResult(metricsDf.count())(1L)
+
+ val row = metricsDf.head()
+ assertResult(param.getDataSource)(row.getAs[String](DataSource))
+ assertResult(param.getName)(row.getAs[String](MeasureName))
+ assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+ val metricMap = row.getAs[Map[String, String]](Metrics)
+ assertResult(metricMap(Total))("5")
+ assertResult(metricMap(measure.Complete))("4")
+ assertResult(metricMap(measure.InComplete))("1")
+ }
+
+ it should "supported complex measure expr" in {
+ val measure = CompletenessMeasure(
+ param.copy(config = Map(Expression -> "name is null or gender is null")))
+ val (recordsDf, metricsDf) = measure.execute(context, None)
+
+ assertResult(recordsDf.schema)(recordDfSchema)
+ assertResult(metricsDf.schema)(metricDfSchema)
+
+ assertResult(recordsDf.count())(dataSet.count())
+ assertResult(metricsDf.count())(1L)
+
+ val row = metricsDf.head()
+ assertResult(param.getDataSource)(row.getAs[String](DataSource))
+ assertResult(param.getName)(row.getAs[String](MeasureName))
+ assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+ val metricMap = row.getAs[Map[String, String]](Metrics)
+ assertResult(metricMap(Total))("5")
+ assertResult(metricMap(measure.Complete))("3")
+ assertResult(metricMap(measure.InComplete))("2")
+ }
+
+ it should "throw runtime error for invalid expr" in {
+ assertThrows[AnalysisException] {
+ CompletenessMeasure(param.copy(config = Map(Expression -> "xyz is null")))
+ .execute(context)
+ }
+
+ }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/MeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/MeasureTest.scala
new file mode 100644
index 0000000..eae90e8
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/MeasureTest.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.{MapType, StringType, StructType}
+import org.scalatest.matchers.should._
+
+import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+
+trait MeasureTest extends SparkSuiteBase with Matchers {
+
+ var sourceSchema: StructType = _
+ var recordDfSchema: StructType = _
+ var metricDfSchema: StructType = _
+ var context: DQContext = _
+
+ var dataSet: DataFrame = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ context =
+ DQContext(ContextId(System.currentTimeMillis), "test-context", Nil, Nil, BatchProcessType)(
+ spark)
+
+ sourceSchema =
+ new StructType().add("id", "integer").add("name", "string").add("gender", "string")
+
+ recordDfSchema = sourceSchema.add(Status, "string", nullable = false)
+ metricDfSchema = new StructType()
+ .add(MeasureName, "string", nullable = false)
+ .add(MeasureType, "string", nullable = false)
+ .add(DataSource, "string", nullable = false)
+ .add(Metrics, MapType(StringType, StringType), nullable = false)
+
+ dataSet = spark
+ .createDataset(
+ Seq(
+ Row(1, "John Smith", "Male"),
+ Row(2, "John Smith", null),
+ Row(3, "Rebecca Davis", "Female"),
+ Row(4, "Paul Adams", "Male"),
+ Row(5, null, null)))(RowEncoder(sourceSchema))
+ .cache()
+
+ dataSet.createOrReplaceTempView("source")
+ }
+
+}