[GRIFFIN-358] Added SparkSqlMeasureTest
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
index fec0d02..b32aa42 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
@@ -13,31 +13,27 @@
import Measure._
- private final val Complete: String = "complete"
- private final val InComplete: String = "incomplete"
+ final val Complete: String = "complete"
+ final val InComplete: String = "incomplete"
override val supportsRecordWrite: Boolean = true
override val supportsMetricWrite: Boolean = true
+ private val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
private val badnessExpr = getFromConfig[String](BadRecordDefinition, StringUtils.EMPTY)
+ validate()
+
override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
-
- val expr = getFromConfig[String](Expression, StringUtils.EMPTY)
-
- assert(!StringUtil.isNullOrEmpty(expr), "Invalid query provided as expr.")
- assert(
- !StringUtil.isNullOrEmpty(badnessExpr),
- "Invalid condition provided as bad.record.definition.")
-
val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
assert(
df.schema.exists(f => f.name.matches(valueColumn) && f.dataType.isInstanceOf[BooleanType]),
s"Invalid condition provided as $BadRecordDefinition. Does not yield a boolean result.")
- val selectCols = Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e)))
+ val selectCols =
+ Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn: Column = map(selectCols: _*).as(valueColumn)
val badRecordsDf = df.withColumn(valueColumn, when(col(valueColumn), 1).otherwise(0))
@@ -49,4 +45,13 @@
(badRecordsDf, metricDf)
}
+
+ private def validate(): Unit = {
+ assert(
+ !StringUtil.isNullOrEmpty(expr),
+ "Invalid query provided as expr. Must not be null, empty or of invalid type.")
+ assert(
+ !StringUtil.isNullOrEmpty(badnessExpr),
+ "Invalid condition provided as bad.record.definition.")
+ }
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
index baec723..492bd1b 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -19,6 +19,7 @@
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure._
@@ -108,6 +109,10 @@
.execute(context)
}
+ assertThrows[ParseException] {
+ CompletenessMeasure(param.copy(config = Map(Expression -> "select 1")))
+ .execute(context)
+ }
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
new file mode 100644
index 0000000..eed4b46
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.AnalysisException
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+
+class SparkSqlMeasureTest extends MeasureTest {
+ var param: MeasureParam = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ param = MeasureParam(
+ "param",
+ "SparkSql",
+ "source",
+ Map(
+ Expression -> "select * from source",
+ BadRecordDefinition -> "name is null or gender is null"))
+ }
+
+ "SparkSqlMeasure" should "validate expression config" in {
+
+ // Validations for Expression
+
+ // Empty
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(param.copy(config = Map.empty[String, String]))
+ }
+
+ // Empty
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+ }
+
+ // Null
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(param.copy(config = Map(Expression -> null)))
+ }
+
+ // Incorrect Type
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(param.copy(config = Map(Expression -> 943)))
+ }
+
+ // Validations for BadRecordDefinition
+
+ // Empty
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(
+ param.copy(
+ config = Map(Expression -> "select 1", BadRecordDefinition -> StringUtils.EMPTY)))
+ }
+
+ // Incorrect Type
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(
+ param.copy(config = Map(Expression -> "select 1", BadRecordDefinition -> 2344)))
+ }
+
+ // Null
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(
+ param.copy(config = Map(Expression -> "select 1", BadRecordDefinition -> null)))
+ }
+
+ }
+
+ it should "support metric writing" in {
+ val measure = SparkSQLMeasure(param)
+ assertResult(true)(measure.supportsMetricWrite)
+ }
+
+ it should "support record writing" in {
+ val measure = SparkSQLMeasure(param)
+ assertResult(true)(measure.supportsRecordWrite)
+ }
+
+ it should "execute defined measure expr" in {
+ val measure = SparkSQLMeasure(param)
+ val (recordsDf, metricsDf) = measure.execute(context, None)
+
+ assertResult(metricsDf.schema)(metricDfSchema)
+
+ assertResult(recordsDf.count())(source.count())
+ assertResult(metricsDf.count())(1L)
+
+ val row = metricsDf.head()
+ assertResult(param.getDataSource)(row.getAs[String](DataSource))
+ assertResult(param.getName)(row.getAs[String](MeasureName))
+ assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+ val metricMap = row.getAs[Map[String, String]](Metrics)
+ assertResult(metricMap(Total))("5")
+ assertResult(metricMap(measure.Complete))("3")
+ assertResult(metricMap(measure.InComplete))("2")
+ }
+
+ it should "throw runtime error for invalid expr" in {
+ assertThrows[AssertionError] {
+ SparkSQLMeasure(
+ param.copy(
+ config = Map(Expression -> "select * from source", BadRecordDefinition -> "name")))
+ .execute(context)
+ }
+
+ assertThrows[AnalysisException] {
+ SparkSQLMeasure(
+ param.copy(config =
+ Map(Expression -> "select 1 as my_value", BadRecordDefinition -> "name is null")))
+ .execute(context)
+ }
+ }
+
+}