[GRIFFIN-358] Fixed breaking test cases
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index ab51a7a..ab771e5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -74,9 +74,9 @@
persistRecords(measure, recordsDf)
persistMetrics(measure, metricsDf)
- })
- MetricFlushStep().execute(context)
+ MetricFlushStep().execute(context)
+ })
}
private def createMeasure(measureParam: MeasureParam): Measure = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index b618010..2f1cdf3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -75,8 +75,8 @@
val nullExpr = accuracyExprs.map(e => col(e.sourceCol).isNull).reduce(_ or _)
val recordsDf = removeColumnPrefix(
- targetDataSource
- .join(dataSource, joinExpr, "outer")
+ dataSource
+ .join(targetDataSource, joinExpr, "left")
.withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
SourcePrefixStr)
.select((originalCols :+ valueColumn).map(col): _*)
@@ -161,7 +161,7 @@
}
}
-object AccuracyMeasure{
+object AccuracyMeasure {
final val SourcePrefixStr: String = "__source_"
final val TargetPrefixStr: String = "__target_"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 936d329..56e93ad 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -51,8 +51,7 @@
}
override def close(): Unit = {
- info(
- s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
+ info(s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
}
override def sinkRecords(records: RDD[String], name: String): Unit = {}
@@ -60,7 +59,7 @@
override def sinkRecords(records: Iterable[String], name: String): Unit = {}
override def sinkMetrics(metrics: Map[String, Any]): Unit = {
- info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
+ griffinLogger.info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
}
override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json
index 7453b9e..9f5892b 100644
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -4,48 +4,78 @@
"data.sources": [
{
"name": "source",
- "baseline": true,
"connector": {
- "type": "avro",
+ "type": "file",
"config": {
- "file.name": "src/test/resources/users_info_src.avro"
+ "format": "avro",
+ "paths": [
+ "src/test/resources/users_info_src.avro"
+ ]
}
}
},
{
"name": "target",
"connector": {
- "type": "avro",
+ "type": "file",
"config": {
- "file.name": "src/test/resources/users_info_target.avro"
+ "format": "avro",
+ "paths": [
+ "src/test/resources/users_info_target.avro"
+ ]
}
}
}
],
- "evaluate.rule": {
- "rules": [
- {
- "dsl.type": "griffin-dsl",
- "dq.type": "accuracy",
- "out.dataframe.name": "accu",
- "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
- "details": {
- "source": "source",
- "target": "target",
- "miss": "miss_count",
- "total": "total_count",
- "matched": "matched_count"
- },
- "out": [
+ "measures": [
+ {
+ "name": "accuracy_measure",
+ "type": "accuracy",
+ "data.source": "target",
+ "config": {
+ "target.source": "source",
+ "expr": [
{
- "type": "record",
- "name": "missRecords"
+ "source.col": "user_id",
+ "target.col": "user_id"
+ },
+ {
+ "source.col": "first_name",
+ "target.col": "first_name"
+ },
+ {
+ "source.col": "last_name",
+ "target.col": "last_name"
+ },
+ {
+ "source.col": "address",
+ "target.col": "address"
+ },
+ {
+ "source.col": "email",
+ "target.col": "email"
+ },
+ {
+ "source.col": "phone",
+ "target.col": "phone"
+ },
+ {
+ "source.col": "post_code",
+ "target.col": "post_code"
}
]
- }
- ]
- },
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "accuracy_metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ],
"sinks": [
- "consoleSink"
+ "consoleSink",
+ "customSink"
]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index a8fdcf7..2a7f6a4 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -1,36 +1,39 @@
{
"name": "comp_batch",
"process.type": "batch",
- "timestamp": 123456,
"data.sources": [
{
"name": "source",
"connector": {
- "type": "avro",
- "version": "1.7",
+ "type": "file",
"config": {
- "file.name": "src/test/resources/users_info_src.avro"
+ "format": "avro",
+ "paths": [
+ "src/test/resources/users_info_src.avro"
+ ]
}
}
}
],
- "evaluate.rule": {
- "rules": [
- {
- "dsl.type": "griffin-dsl",
- "dq.type": "completeness",
- "out.dataframe.name": "comp",
- "rule": "email, post_code, first_name",
- "out": [
- {
- "type": "metric",
- "name": "comp"
- }
- ]
- }
- ]
- },
+ "measures": [
+ {
+ "name": "completeness_measure",
+ "type": "completeness",
+ "data.source": "source",
+ "config": {
+ "expr": "email is null or post_code is null or first_name is null"
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "comp_metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ],
"sinks": [
- "CONSOLESINK"
+ "CONSOLESINK",
+ "customSink"
]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json
index 39db17c..2195795 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -4,53 +4,37 @@
"data.sources": [
{
"name": "source",
- "baseline": true,
"connector": {
- "type": "avro",
- "version": "1.7",
+ "type": "file",
"config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
- }
- },
- {
- "name": "target",
- "baseline": true,
- "connector": {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
+ "format": "avro",
+ "paths": [
+ "src/test/resources/users_info_src.avro"
+ ]
}
}
}
],
- "evaluate.rule": {
- "rules": [
- {
- "dsl.type": "griffin-dsl",
- "dq.type": "distinct",
- "out.dataframe.name": "dist",
- "rule": "user_id",
- "details": {
- "source": "source",
- "target": "target",
- "total": "total",
- "distinct": "distinct",
- "dup": "dup",
- "num": "num",
- "duplication.array": "dup"
- },
- "out": [
- {
- "type": "metric",
- "name": "distinct"
- }
- ]
- }
- ]
- },
+ "measures": [
+ {
+ "name": "duplication_measure",
+ "type": "duplication",
+ "data.source": "source",
+ "config": {
+ "expr": "user_id",
+ "bad.record.definition": "distinct"
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "duplication_metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ],
"sinks": [
- "CONSOLESink"
+ "CONSOLESink",
+ "customSink"
]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json
index aa4d749..c064594 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -1,54 +1,44 @@
{
"name": "prof_batch",
"process.type": "batch",
- "timestamp": 123456,
"data.sources": [
{
"name": "source",
"connector": {
- "type": "avro",
- "version": "1.7",
- "dataframe.name": "this_table",
+ "type": "file",
"config": {
- "file.name": "src/test/resources/users_info_src.avro"
+ "format": "avro",
+ "paths": [
+ "src/test/resources/users_info_src.avro"
+ ]
},
"pre.proc": [
- "select * from this_table where user_id < 10014"
+ "select * from this where user_id < 10014"
]
}
}
],
- "evaluate.rule": {
- "rules": [
- {
- "dsl.type": "griffin-dsl",
- "dq.type": "profiling",
- "out.dataframe.name": "prof",
- "rule": "user_id, count(*) as cnt from source group by user_id",
- "out": [
- {
- "type": "metric",
- "name": "prof",
- "flatten": "array"
- }
- ]
+ "measures": [
+ {
+ "name": "profiling_measure",
+ "type": "profiling",
+ "data.source": "source",
+ "config": {
+ "expr": "first_name, user_id",
+ "approx.distinct.count": true,
+ "round.scale": 2
},
- {
- "dsl.type": "griffin-dsl",
- "dq.type": "profiling",
- "out.dataframe.name": "grp",
- "rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc",
- "out": [
- {
- "type": "metric",
- "name": "post_group",
- "flatten": "array"
- }
- ]
- }
- ]
- },
+ "out": [
+ {
+ "type": "metric",
+ "name": "prof_metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ],
"sinks": [
- "CONSOLESink"
+ "CONSOLESink",
+ "customSink"
]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_sparksql-batch-griffindsl.json b/measure/src/test/resources/_sparksql-batch-griffindsl.json
new file mode 100644
index 0000000..1cf98a4
--- /dev/null
+++ b/measure/src/test/resources/_sparksql-batch-griffindsl.json
@@ -0,0 +1,59 @@
+{
+ "name": "prof_batch",
+ "process.type": "batch",
+ "data.sources": [
+ {
+ "name": "source",
+ "connector": {
+ "type": "file",
+ "config": {
+ "format": "avro",
+ "paths": [
+ "src/test/resources/users_info_src.avro"
+ ]
+ },
+ "pre.proc": [
+ "select * from this where user_id < 10014"
+ ]
+ }
+ }
+ ],
+ "measures": [
+ {
+ "name": "query_measure1",
+ "type": "sparkSQL",
+ "data.source": "source",
+ "config": {
+ "expr": "select user_id, count(*) as cnt from source group by user_id",
+ "bad.record.definition": "cnt > 1"
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "sql_metric",
+ "flatten": "map"
+ }
+ ]
+ },
+ {
+ "name": "query_measure2",
+ "type": "sparkSQL",
+ "data.source": "source",
+ "config": {
+ "expr": "select post_code, count(*) as cnt from source group by post_code order by cnt desc",
+ "bad.record.definition": "cnt > 1"
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "sql_metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ],
+ "sinks": [
+ "CONSOLESink",
+ "customSink"
+ ]
+}
\ No newline at end of file
diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json
index de347c7..f2c5ee7 100644
--- a/measure/src/test/resources/env-batch.json
+++ b/measure/src/test/resources/env-batch.json
@@ -12,6 +12,13 @@
"config": {
"max.log.lines": 10
}
+ },
+ {
+ "name": "customSink",
+ "type": "custom",
+ "config": {
+ "class": "org.apache.griffin.measure.sink.CustomSink"
+ }
}
],
"griffin.checkpoint": []
diff --git a/measure/src/test/resources/log4j.properties b/measure/src/test/resources/log4j.properties
index 3b408db..bc73999 100644
--- a/measure/src/test/resources/log4j.properties
+++ b/measure/src/test/resources/log4j.properties
@@ -16,15 +16,12 @@
# specific language governing permissions and limitations
# under the License.
#
-
-
log4j.rootLogger=INFO, stdout
+log4j.logger.org.apache=ERROR
+log4j.logger.DataNucleus=ERROR
+log4j.logger.org.spark_project=ERROR
+log4j.logger.org.apache.griffin=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
-log4j.logger.org.apache=WARN
-log4j.logger.org.spark_project=WARN
-
-# for travis test log
-log4j.logger.org.apache.hadoop.hive.metastore=INFO
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
index 011122f..66180b1 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -22,10 +22,11 @@
import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
import org.scalatest._
import org.scalatest.flatspec.AnyFlatSpec
-trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll {
+trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@transient var spark: SparkSession = _
@transient var sc: SparkContext = _
@@ -38,6 +39,9 @@
spark = SparkSession.builder
.master("local[4]")
.appName("Griffin Job Suite")
+ .config(SQLConf.SHUFFLE_PARTITIONS.key, "4")
+ .config("spark.default.parallelism", "4")
+ .config("spark.sql.crossJoin.enabled", "true")
.config(conf)
.enableHiveSupport()
.getOrCreate()
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index f624e99..d5b5eb2 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -17,28 +17,21 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
-import org.scalatest._
import scala.util.{Failure, Success}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
-import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-import org.scalatest._
-import flatspec.AnyFlatSpec
-import matchers.should._
+import org.apache.griffin.measure.configuration.enums.MeasureTypes
+
class ParamFileReaderSpec extends AnyFlatSpec with Matchers {
"params " should "be parsed from a valid file" in {
val reader: ParamReader =
ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
val params = reader.readConfig[DQConfig]
- params match {
- case Success(v) =>
- v.getEvaluateRule.getRules.head.getDslType should ===(GriffinDsl)
- v.getEvaluateRule.getRules.head.getOutDfName() should ===("accu")
- case Failure(e) =>
- fail("it should not happen", e)
- }
-
+ assert(params.isSuccess)
}
it should "fail for an invalid file" in {
@@ -69,16 +62,10 @@
}
it should "be parsed from a valid errorconf completeness json file" in {
- val reader: ParamReader = ParamFileReader(
- getClass.getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
+ val reader: ParamReader =
+ ParamFileReader(getClass.getResource("/_completeness-batch-griffindsl.json").getFile)
val params = reader.readConfig[DQConfig]
- params match {
- case Success(v) =>
- v.getEvaluateRule.getRules.head.getErrorConfs.length should ===(2)
- v.getEvaluateRule.getRules.head.getErrorConfs.head.getColumnName.get should ===("user")
- v.getEvaluateRule.getRules.head.getErrorConfs(1).getColumnName.get should ===("name")
- case Failure(e) =>
- fail("it should not happen", e)
- }
+ assert(params.isSuccess)
+ assert(params.get.getMeasures.forall(_.getType == MeasureTypes.Completeness))
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index d8bf125..db95c6a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -24,7 +24,6 @@
import org.scalatest.matchers.should._
import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
-import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
class ParamJsonReaderSpec extends AnyFlatSpec with Matchers {
@@ -36,14 +35,8 @@
val reader: ParamReader = ParamJsonReader(jsonString)
val params = reader.readConfig[DQConfig]
- params match {
- case Success(v) =>
- v.getEvaluateRule.getRules.head.getDslType should ===(GriffinDsl)
- v.getEvaluateRule.getRules.head.getOutDfName() should ===("accu")
- case Failure(_) =>
- fail("it should not happen")
- }
+ assert(params.isSuccess)
}
it should "fail for an invalid file" in {
@@ -58,7 +51,7 @@
case Success(_) =>
fail("it is an invalid config file")
case Failure(e) =>
- e.getMessage should include("evaluate.rule should not be null")
+ e.getMessage should include("Connector is undefined or invalid")
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
index fbbfc1b..e18efaf 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
@@ -128,11 +128,11 @@
val measure = AccuracyMeasure(param)
val (recordsDf, metricsDf) = measure.execute(context, None)
- assertResult(recordsDf.schema)(recordDfSchema)
- assertResult(metricsDf.schema)(metricDfSchema)
+ assertResult(recordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
- assertResult(recordsDf.count())(source.count())
- assertResult(metricsDf.count())(1L)
+ assertResult(source.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
index 492bd1b..618b96e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -64,11 +64,11 @@
val measure = CompletenessMeasure(param)
val (recordsDf, metricsDf) = measure.execute(context, None)
- assertResult(recordsDf.schema)(recordDfSchema)
- assertResult(metricsDf.schema)(metricDfSchema)
+ assertResult(recordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
- assertResult(recordsDf.count())(source.count())
- assertResult(metricsDf.count())(1L)
+ assertResult(source.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
@@ -86,11 +86,11 @@
param.copy(config = Map(Expression -> "name is null or gender is null")))
val (recordsDf, metricsDf) = measure.execute(context, None)
- assertResult(recordsDf.schema)(recordDfSchema)
- assertResult(metricsDf.schema)(metricDfSchema)
+ assertResult(recordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
- assertResult(recordsDf.count())(source.count())
- assertResult(metricsDf.count())(1L)
+ assertResult(source.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index b3c2c4f..d50b202 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -104,11 +104,11 @@
val measure = DuplicationMeasure(param.copy(config = Map(BadRecordDefinition -> "duplicate")))
val (recordsDf, metricsDf) = measure.execute(context, None)
- assertResult(recordsDf.schema)(recordDfSchema)
- assertResult(metricsDf.schema)(metricDfSchema)
+ assertResult(recordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
- assertResult(recordsDf.count())(source.count())
- assertResult(metricsDf.count())(1L)
+ assertResult(source.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
@@ -127,11 +127,11 @@
param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "duplicate")))
val (recordsDf, metricsDf) = measure.execute(context, None)
- assertResult(recordsDf.schema)(recordDfSchema)
- assertResult(metricsDf.schema)(metricDfSchema)
+ assertResult(recordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
- assertResult(recordsDf.count())(source.count())
- assertResult(metricsDf.count())(1L)
+ assertResult(source.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
index 0fc711a..15489a3 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
@@ -70,7 +70,7 @@
val (_, metricsDf) = measure.execute(context, None)
- assertResult(metricsDf.count())(1L)
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
@@ -94,7 +94,7 @@
val (_, metricsDf) = measure.execute(context, None)
- assertResult(metricsDf.count())(1L)
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
index 31f73e7..0a52d9c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
@@ -98,10 +98,9 @@
val measure = SparkSQLMeasure(param)
val (recordsDf, metricsDf) = measure.execute(context, None)
- assertResult(metricsDf.schema)(metricDfSchema)
-
- assertResult(recordsDf.count())(source.count())
- assertResult(metricsDf.count())(1L)
+ assertResult(metricDfSchema)(metricsDf.schema)
+ assertResult(source.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
val row = metricsDf.head()
assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index a95d76e..868b0ed 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -17,13 +17,13 @@
package org.apache.griffin.measure.job
-import org.apache.spark.sql.AnalysisException
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.execution.Measure._
import org.apache.griffin.measure.Application.readParamFile
import org.apache.griffin.measure.configuration.dqdefinition.EnvConfig
-import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.sink.CustomSinkResultRegister
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
class BatchDQAppTest extends DQAppTest {
@@ -42,8 +42,6 @@
Try {
sparkParam.getConfig.foreach { case (k, v) => spark.conf.set(k, v) }
- spark.conf.set("spark.app.name", "BatchDQApp Test")
- spark.conf.set("spark.sql.crossJoin.enabled", "true")
val logLevel = getGriffinLogLevel
sc.setLogLevel(sparkParam.getLogLevel)
@@ -54,21 +52,41 @@
}
}
- def runAndCheckResult(metrics: Map[String, Any]): Unit = {
- dqApp.run match {
- case Success(ret) => assert(ret)
- case Failure(ex) =>
- error(s"process run error: ${ex.getMessage}", ex)
- throw ex
- }
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ dqApp = null
+ CustomSinkResultRegister.clear()
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+
+ dqApp = null
+ CustomSinkResultRegister.clear()
+ }
+
+ def runAndCheckResult(expectedMetrics: Map[String, Map[String, Any]]): Unit = {
// check Result Metrics
- val dqContext = dqApp.asInstanceOf[BatchDQApp].dqContext
- val timestamp = dqContext.contextId.timestamp
- val expectedMetrics =
- Map(timestamp -> metrics)
- dqContext.metricWrapper.metrics should equal(expectedMetrics)
+ val measureNames = dqApp.dqParam.getMeasures
+ assert(measureNames.nonEmpty)
+
+ measureNames.foreach(param => {
+ val actualMetricsOpt = CustomSinkResultRegister.getMetrics(param.getName)
+ assert(actualMetricsOpt.isDefined)
+
+ val actualMetricsMap = actualMetricsOpt.get
+
+ assertResult(param.getName)(actualMetricsMap.get(MeasureName).orNull)
+ assertResult(param.getType.toString)(actualMetricsMap.get(MeasureType).orNull)
+ assertResult(param.getDataSource)(actualMetricsMap.get(DataSource).orNull)
+
+ val actualMetrics = actualMetricsMap.getOrElse(Metrics, null).asInstanceOf[Map[String, Any]]
+
+ assert(expectedMetrics.contains(param.getName))
+ actualMetrics should contain theSameElementsAs expectedMetrics(param.getName)
+ })
}
def runAndCheckException[T <: AnyRef](implicit classTag: ClassTag[T]): Unit = {
@@ -81,79 +99,80 @@
}
"accuracy batch job" should "work" in {
- dqApp = initApp("/_accuracy-batch-griffindsl.json")
- val expectedMetrics = Map(
- "total_count" -> 50,
- "miss_count" -> 4,
- "matched_count" -> 46,
- "matchedFraction" -> 0.92)
+ dqApp = runApp("/_accuracy-batch-griffindsl.json")
+ val expectedMetrics = Map("total" -> "50", "accurate" -> "45", "inaccurate" -> "5")
- runAndCheckResult(expectedMetrics)
+ runAndCheckResult(Map("accuracy_measure" -> expectedMetrics))
}
"completeness batch job" should "work" in {
- dqApp = initApp("/_completeness-batch-griffindsl.json")
- val expectedMetrics = Map("total" -> 50, "incomplete" -> 1, "complete" -> 49)
+ dqApp = runApp("/_completeness-batch-griffindsl.json")
+ val expectedMetrics = Map("total" -> "50", "incomplete" -> "1", "complete" -> "49")
- runAndCheckResult(expectedMetrics)
+ runAndCheckResult(Map("completeness_measure" -> expectedMetrics))
}
- "distinctness batch job" should "work" in {
- dqApp = initApp("/_distinctness-batch-griffindsl.json")
+ "duplication batch job" should "work" in {
+ dqApp = runApp("/_distinctness-batch-griffindsl.json")
+ val expectedMetrics =
+ Map("duplicate" -> "1", "unique" -> "48", "non_unique" -> "1", "distinct" -> "49")
+
+ runAndCheckResult(Map("duplication_measure" -> expectedMetrics))
+ }
+
+ "spark sql batch job" should "work" in {
+ dqApp = runApp("/_sparksql-batch-griffindsl.json")
val expectedMetrics =
- Map("total" -> 50, "distinct" -> 49, "dup" -> Seq(Map("dup" -> 1, "num" -> 1)))
+ Map(
+ "query_measure1" -> Map("total" -> "13", "complete" -> "13", "incomplete" -> "0"),
+ "query_measure2" -> Map("total" -> "1", "complete" -> "0", "incomplete" -> "1"))
runAndCheckResult(expectedMetrics)
}
"profiling batch job" should "work" in {
- dqApp = initApp("/_profiling-batch-griffindsl.json")
+ dqApp = runApp("/_profiling-batch-griffindsl.json")
+
val expectedMetrics = Map(
- "prof" -> Seq(
- Map("user_id" -> 10004, "cnt" -> 1),
- Map("user_id" -> 10011, "cnt" -> 1),
- Map("user_id" -> 10010, "cnt" -> 1),
- Map("user_id" -> 10002, "cnt" -> 1),
- Map("user_id" -> 10006, "cnt" -> 1),
- Map("user_id" -> 10001, "cnt" -> 1),
- Map("user_id" -> 10005, "cnt" -> 1),
- Map("user_id" -> 10008, "cnt" -> 1),
- Map("user_id" -> 10013, "cnt" -> 1),
- Map("user_id" -> 10003, "cnt" -> 1),
- Map("user_id" -> 10007, "cnt" -> 1),
- Map("user_id" -> 10012, "cnt" -> 1),
- Map("user_id" -> 10009, "cnt" -> 1)),
- "post_group" -> Seq(Map("post_code" -> "94022", "cnt" -> 13)))
+ "column_details" -> Map(
+ "user_id" -> Map(
+ "avg_col_len" -> "5.0",
+ "max_col_len" -> "5",
+ "variance" -> "15.17",
+ "kurtosis" -> "-1.21",
+ "avg" -> "10007.0",
+ "min" -> "10001",
+ "null_count" -> "0",
+ "approx_distinct_count" -> "13",
+ "total" -> "13",
+ "std_dev" -> "3.89",
+ "data_type" -> "bigint",
+ "max" -> "10013",
+ "min_col_len" -> "5"),
+ "first_name" -> Map(
+ "avg_col_len" -> null,
+ "max_col_len" -> "6",
+ "variance" -> null,
+ "kurtosis" -> null,
+ "avg" -> null,
+ "min" -> null,
+ "null_count" -> "0",
+ "approx_distinct_count" -> "13",
+ "total" -> "13",
+ "std_dev" -> null,
+ "data_type" -> "string",
+ "max" -> null,
+ "min_col_len" -> "6")))
- runAndCheckResult(expectedMetrics)
- }
-
- "timeliness batch job" should "work" in {
- dqApp = initApp("/_timeliness-batch-griffindsl.json")
- val expectedMetrics = Map(
- "total" -> 10,
- "avg" -> 276000,
- "percentile_95" -> 660000,
- "step" -> Seq(
- Map("step" -> 0, "cnt" -> 6),
- Map("step" -> 5, "cnt" -> 2),
- Map("step" -> 3, "cnt" -> 1),
- Map("step" -> 4, "cnt" -> 1)))
-
- runAndCheckResult(expectedMetrics)
- }
-
- "uniqueness batch job" should "work" in {
- dqApp = initApp("/_uniqueness-batch-griffindsl.json")
- val expectedMetrics = Map("total" -> 50, "unique" -> 48)
-
- runAndCheckResult(expectedMetrics)
+ runAndCheckResult(Map("profiling_measure" -> expectedMetrics))
}
"batch job" should "fail with exception caught due to invalid rules" in {
- dqApp = initApp("/_profiling-batch-griffindsl_malformed.json")
+ assertThrows[java.lang.AssertionError] {
+ runApp("/_profiling-batch-griffindsl_malformed.json")
+ }
- runAndCheckException[AnalysisException]
+ assertThrows[NullPointerException](runAndCheckException)
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index 9fc9883..e24bcd5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -46,12 +46,11 @@
}
}
- def initApp(dqParamFile: String): DQApp = {
+ def runApp(dqParamFile: String): DQApp = {
val dqParam = readParamFile[DQConfig](getConfigFilePath(dqParamFile)) match {
case Success(p) => p
case Failure(ex) =>
- error(ex.getMessage, ex)
- sys.exit(-2)
+ throw ex
}
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
@@ -67,6 +66,14 @@
}
dqApp.sparkSession = spark
+
+ dqApp.run match {
+ case Success(ret) => assert(ret)
+ case Failure(ex) =>
+ error(s"process run error: ${ex.getMessage}", ex)
+ throw ex
+ }
+
dqApp
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index 3d0aa0e..93aa0ff 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -24,12 +24,12 @@
import org.apache.spark.sql.DataFrame
/**
- * sink records and metrics in memory for test.
+ * A dummy batch sink for testing.
*
* @param config sink configurations
- * @param jobName
- * @param timeStamp
- * @param block
+ * @param jobName Griffin Job Name
+ * @param timeStamp timestamp for job
+ * @param block is blocking or not
*/
case class CustomSink(config: Map[String, Any], jobName: String, timeStamp: Long, block: Boolean)
extends Sink {
@@ -50,10 +50,46 @@
val allMetrics: mutable.Map[String, Any] = mutable.Map[String, Any]()
override def sinkMetrics(metrics: Map[String, Any]): Unit = {
+ val (metricName: String, value: Map[String, Any]) = metrics("value")
+ .asInstanceOf[Map[String, Any]]
+ .head
+
+ CustomSinkResultRegister.setMetrics(metricName, value)
+
allMetrics ++= metrics
}
override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
+ CustomSinkResultRegister.setBatch(key.get, dataset.toJSON.collect())
allRecords ++= dataset.toJSON.rdd.collect()
}
}
+
+/**
+ * Register for storing test sink results in memory
+ */
+object CustomSinkResultRegister {
+
+ private val _metricsSink: mutable.Map[String, Map[String, Any]] = mutable.HashMap.empty
+ private val _batchSink: mutable.Map[String, Array[String]] = mutable.HashMap.empty
+
+ def setMetrics(key: String, metrics: Map[String, Any]): Unit = {
+ val updatedMetrics = _metricsSink.getOrElse(key, Map.empty) ++ metrics
+ _metricsSink.put(key, updatedMetrics)
+ }
+
+ def getMetrics(key: String): Option[Map[String, Any]] = _metricsSink.get(key)
+
+ def setBatch(key: String, batch: Array[String]): Unit = {
+ val updatedBatch = _batchSink.getOrElse(key, Array.empty) ++ batch
+ _batchSink.put(key, updatedBatch)
+ }
+
+ def getBatch(key: String): Option[Array[String]] = _batchSink.get(key)
+
+ def clear(): Unit = {
+ _metricsSink.clear()
+ _batchSink.clear()
+ }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index 8bf81b1..55e1e9c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -19,8 +19,10 @@
import scala.collection.mutable
+import org.apache.spark.sql.functions._
+
import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, SinkParam}
-import org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.FlattenType.MapFlattenType
import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep, RecordWriteStep}
class CustomSinkTest extends SinkTestBase {
@@ -40,29 +42,30 @@
}
"custom sink" can "sink metrics" in {
- val actualMetrics = withCustomSink(sinks => {
+ val measureName = "test_measure"
+ withCustomSink(sinks => {
sinks.foreach { sink =>
try {
- sink.sinkMetrics(Map("sum" -> 10))
+ sink.sinkMetrics(Map("value" -> Map(measureName -> Map("sum" -> 10))))
} catch {
case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
}
}
+
sinks.foreach { sink =>
try {
- sink.sinkMetrics(Map("count" -> 5))
+ sink.sinkMetrics(Map("value" -> Map(measureName -> Map("count" -> 5))))
} catch {
case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
}
}
- sinks.headOption match {
- case Some(sink: CustomSink) => sink.allMetrics
- case _ => Map.empty
- }
})
+ val actualMetricsOpt = CustomSinkResultRegister.getMetrics(measureName)
+ assert(actualMetricsOpt.isDefined)
+
val expected = Map("sum" -> 10, "count" -> 5)
- actualMetrics should be(expected)
+ actualMetricsOpt.get should contain theSameElementsAs expected
}
"custom sink" can "sink records" in {
@@ -134,33 +137,44 @@
"MetricWriteStep" should "output default metrics with custom sink" in {
val resultTable = "result_table"
val df = createDataFrame(1 to 5)
- df.groupBy("sex")
- .agg("age" -> "max", "age" -> "avg")
- .createOrReplaceTempView(resultTable)
+ val metricCols = Seq("sex", "max_age", "avg_age").flatMap(c => Seq(lit(c), col(c)))
+
+ val metricDf = df
+ .groupBy("sex")
+ .agg(max("age").as("max_age"), avg("age").as("avg_age"))
+ .select(map(metricCols: _*).as("metrics"))
+ .withColumn("mark", lit(1))
+ .groupBy("mark")
+ .agg(collect_list("metrics") as "metrics")
+ .select("metrics")
+
+ metricDf.createOrReplaceTempView(resultTable)
val dQContext = getDqContext()
val metricWriteStep = {
- val metricOpt = Some(metricsDefaultOutput)
+ val metricOpt = Some(metricsMapOutput)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
- val flattenType = metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(MapFlattenType)
+
MetricWriteStep(mwName, resultTable, flattenType)
}
metricWriteStep.execute(dQContext)
MetricFlushStep().execute(dQContext)
- val actualMetrics = dQContext.getSinks.headOption match {
- case Some(sink: CustomSink) => sink.allMetrics
- case _ => mutable.Map[String, Any]()
- }
- val metricsValue = Seq(
- Map("sex" -> "man", "max(age)" -> 19, "avg(age)" -> 18.0),
- Map("sex" -> "women", "max(age)" -> 20, "avg(age)" -> 18.0))
+ val expectedMetrics = Array(
+ Map("sex" -> "women", "max_age" -> "20", "avg_age" -> "18.0"),
+ Map("sex" -> "man", "max_age" -> "19", "avg_age" -> "18.0"))
- val expected = Map("default_output" -> metricsValue)
+ val actualMetricsOpt = CustomSinkResultRegister.getMetrics(metricWriteStep.name)
+ assert(actualMetricsOpt.isDefined)
- actualMetrics("value") should be(expected)
+ val actualMetricsMap: Map[String, Any] = actualMetricsOpt.get
+ assert(actualMetricsMap.contains("metrics"))
+
+ val actualMetrics = actualMetricsMap("metrics").asInstanceOf[Seq[Map[String, String]]]
+ actualMetrics should contain theSameElementsAs expectedMetrics
}
}