[GRIFFIN-358] Added DuplicationMeasureTest
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index fc6e10f..b1eaad3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -25,7 +25,6 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
-import org.apache.griffin.measure.step.builder.ConstantColumns
/**
* Duplication Measure
@@ -40,14 +39,9 @@
*/
case class DuplicationMeasure(measureParam: MeasureParam) extends Measure {
+ import DuplicationMeasure._
import Measure._
- private final val Duplicate: String = "duplicate"
- private final val Unique: String = "unique"
- private final val NonUnique: String = "non_unique"
- private final val Distinct: String = "distinct"
- private final val __Temp: String = "__temp"
-
private final val duplicationMeasures = Seq(Duplicate, Unique, NonUnique, Distinct)
val exprs: String = getFromConfig[String](Expression, null)
@@ -68,18 +62,11 @@
val nonUniqueCol =
when(col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1).otherwise(0)
- val keyCols = {
- if (StringUtil.isNullOrEmpty(exprs)) input.columns
- else exprs.split(",").map(_.trim)
- }.distinct
-
- keyCols.foreach(c =>
- assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
-
- val window = Window.partitionBy(keyCols.map(col): _*).orderBy(ConstantColumns.tmst)
+ val cols = keyCols(input).map(col)
+ val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val aggDf = input
- .select(col("*"), row_number().over(window).as(__Temp))
+ .select(col(AllColumns), row_number().over(window).as(__Temp))
.withColumn(Duplicate, duplicateCol)
.withColumn(Unique, count(lit(1)).over(window))
.withColumn(Unique, uniqueCol)
@@ -90,7 +77,7 @@
.drop(__Temp)
val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
- val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e)))
+ val selectCols = duplicationMeasures.flatMap(e => Seq(lit(e), col(e).cast("string")))
val metricColumn = map(selectCols: _*).as(valueColumn)
val metricDf = aggDf
.agg(metricAggCols.head, metricAggCols.tail: _*)
@@ -102,6 +89,13 @@
}
private def validate(): Unit = {
+ val input = SparkSession.getDefaultSession.get.read.table(measureParam.getDataSource)
+ val kc = keyCols(input)
+
+ assert(kc.nonEmpty, s"Columns defined in '$Expression' is empty.")
+ kc.foreach(c =>
+ assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
+
assert(
!StringUtil.isNullOrEmpty(badnessExpr),
s"Invalid value '$badnessExpr' provided for $BadRecordDefinition")
@@ -111,4 +105,18 @@
case _ => false
}, s"Invalid value '$badnessExpr' was provided for $BadRecordDefinition")
}
+
+ private def keyCols(input: DataFrame): Array[String] = {
+ if (StringUtil.isNullOrEmpty(exprs)) input.columns
+ else exprs.split(",").map(_.trim)
+ }.distinct
+
+}
+
+object DuplicationMeasure {
+ final val Duplicate: String = "duplicate"
+ final val Unique: String = "unique"
+ final val NonUnique: String = "non_unique"
+ final val Distinct: String = "distinct"
+ final val __Temp: String = "__temp"
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
new file mode 100644
index 0000000..7fed3d9
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.execution.impl
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.DuplicationMeasure._
+
+class DuplicationMeasureTest extends MeasureTest {
+ var param: MeasureParam = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ param = MeasureParam(
+ "param",
+ "Duplication",
+ "source",
+ Map(Expression -> "name", BadRecordDefinition -> "duplicate"))
+ }
+
+ "DuplicationMeasure" should "validate expression config" in {
+ // Validations for Duplication Expr
+
+ // Empty
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map.empty[String, String]))
+ }
+
+ // Empty
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> StringUtils.EMPTY)))
+ }
+
+ // Null
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> null)))
+ }
+
+ // Incorrect Type
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> 1234)))
+ }
+
+ // Validations for BadRecordDefinition
+
+ // Empty
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> "name")))
+ }
+
+ // Empty
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "")))
+ }
+
+ // Null
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> "name", BadRecordDefinition -> null)))
+ }
+
+ // Incorrect Type
+ assertThrows[AssertionError] {
+ DuplicationMeasure(param.copy(config = Map(Expression -> "name", BadRecordDefinition -> 435)))
+ }
+
+ // Incorrect Value
+ assertThrows[AssertionError] {
+ DuplicationMeasure(
+ param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "xyz")))
+ }
+ }
+
+ it should "support metric writing" in {
+ val measure = DuplicationMeasure(param)
+ assertResult(true)(measure.supportsMetricWrite)
+ }
+
+ it should "support record writing" in {
+ val measure = DuplicationMeasure(param)
+ assertResult(true)(measure.supportsRecordWrite)
+ }
+
+ it should "execute defined measure expr" in {
+ val measure = DuplicationMeasure(param.copy(config = Map(BadRecordDefinition -> "duplicate")))
+ val (recordsDf, metricsDf) = measure.execute(context, None)
+
+ assertResult(recordsDf.schema)(recordDfSchema)
+ assertResult(metricsDf.schema)(metricDfSchema)
+
+ assertResult(recordsDf.count())(source.count())
+ assertResult(metricsDf.count())(1L)
+
+ val row = metricsDf.head()
+ assertResult(param.getDataSource)(row.getAs[String](DataSource))
+ assertResult(param.getName)(row.getAs[String](MeasureName))
+ assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+ val metricMap = row.getAs[Map[String, String]](Metrics)
+ println(metricMap)
+ assertResult(metricMap(Duplicate))("0")
+ assertResult(metricMap(Unique))("5")
+ assertResult(metricMap(NonUnique))("0")
+ assertResult(metricMap(Distinct))("5")
+ }
+
+ it should "supported complex measure expr" in {
+ val measure = DuplicationMeasure(
+ param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "duplicate")))
+ val (recordsDf, metricsDf) = measure.execute(context, None)
+
+ assertResult(recordsDf.schema)(recordDfSchema)
+ assertResult(metricsDf.schema)(metricDfSchema)
+
+ assertResult(recordsDf.count())(source.count())
+ assertResult(metricsDf.count())(1L)
+
+ val row = metricsDf.head()
+ assertResult(param.getDataSource)(row.getAs[String](DataSource))
+ assertResult(param.getName)(row.getAs[String](MeasureName))
+ assertResult(param.getType.toString)(row.getAs[String](MeasureType))
+
+ val metricMap = row.getAs[Map[String, String]](Metrics)
+ assertResult(metricMap(Duplicate))("1")
+ assertResult(metricMap(Unique))("3")
+ assertResult(metricMap(NonUnique))("1")
+ assertResult(metricMap(Distinct))("4")
+ }
+
+}