| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.griffin.measure.job |
| |
| import scala.reflect.ClassTag |
| import scala.util.{Failure, Success, Try} |
| |
| import org.apache.griffin.measure.execution.Measure._ |
| import org.apache.griffin.measure.Application.readParamFile |
| import org.apache.griffin.measure.configuration.dqdefinition.EnvConfig |
| import org.apache.griffin.measure.sink.CustomSinkResultRegister |
| import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent |
| |
| class BatchDQAppTest extends DQAppTest { |
| |
| override def beforeAll(): Unit = { |
| super.beforeAll() |
| |
| envParam = readParamFile[EnvConfig](getConfigFilePath("/env-batch.json")) match { |
| case Success(p) => p |
| case Failure(ex) => |
| error(ex.getMessage, ex) |
| sys.exit(-2) |
| } |
| |
| sparkParam = envParam.getSparkParam |
| |
| Try { |
| sparkParam.getConfig.foreach { case (k, v) => spark.conf.set(k, v) } |
| |
| val logLevel = getGriffinLogLevel |
| sc.setLogLevel(sparkParam.getLogLevel) |
| griffinLogger.setLevel(logLevel) |
| |
| // register udf |
| GriffinUDFAgent.register(spark) |
| } |
| } |
| |
| override def beforeEach(): Unit = { |
| super.beforeEach() |
| |
| dqApp = null |
| CustomSinkResultRegister.clear() |
| } |
| |
| override def afterEach(): Unit = { |
| super.afterEach() |
| |
| dqApp = null |
| CustomSinkResultRegister.clear() |
| } |
| |
| def runAndCheckResult(expectedMetrics: Map[String, Map[String, Any]]): Unit = { |
| // check Result Metrics |
| |
| val measureNames = dqApp.dqParam.getMeasures |
| assert(measureNames.nonEmpty) |
| |
| measureNames.foreach(param => { |
| val actualMetricsOpt = CustomSinkResultRegister.getMetrics(param.getName) |
| assert(actualMetricsOpt.isDefined) |
| |
| val actualMetricsMap = actualMetricsOpt.get |
| |
| assertResult(param.getName)(actualMetricsMap.get(MeasureName).orNull) |
| assertResult(param.getType.toString)(actualMetricsMap.get(MeasureType).orNull) |
| assertResult(param.getDataSource)(actualMetricsMap.get(DataSource).orNull) |
| |
| val actualMetrics = actualMetricsMap.getOrElse(Metrics, null).asInstanceOf[Map[String, Any]] |
| |
| assert(expectedMetrics.contains(param.getName)) |
| actualMetrics should contain theSameElementsAs expectedMetrics(param.getName) |
| }) |
| } |
| |
| def runAndCheckException[T <: AnyRef](implicit classTag: ClassTag[T]): Unit = { |
| dqApp.run match { |
| case Success(_) => |
| fail( |
| s"job ${dqApp.dqParam.getName} should not succeed, a ${classTag.toString} exception is expected.") |
| case Failure(ex) => assertThrows[T](throw ex) |
| } |
| } |
| |
| "accuracy batch job" should "work" in { |
| dqApp = runApp("/_accuracy-batch-griffindsl.json") |
| val expectedMetrics = Map("total" -> "50", "accurate" -> "45", "inaccurate" -> "5") |
| |
| runAndCheckResult(Map("accuracy_measure" -> expectedMetrics)) |
| } |
| |
| "completeness batch job" should "work" in { |
| dqApp = runApp("/_completeness-batch-griffindsl.json") |
| val expectedMetrics = Map("total" -> "50", "incomplete" -> "1", "complete" -> "49") |
| |
| runAndCheckResult(Map("completeness_measure" -> expectedMetrics)) |
| } |
| |
| "duplication batch job" should "work" in { |
| dqApp = runApp("/_distinctness-batch-griffindsl.json") |
| val expectedMetrics = |
| Map("duplicate" -> "1", "unique" -> "48", "non_unique" -> "1", "distinct" -> "49") |
| |
| runAndCheckResult(Map("duplication_measure" -> expectedMetrics)) |
| } |
| |
| "spark sql batch job" should "work" in { |
| dqApp = runApp("/_sparksql-batch-griffindsl.json") |
| |
| val expectedMetrics = |
| Map( |
| "query_measure1" -> Map("total" -> "13", "complete" -> "13", "incomplete" -> "0"), |
| "query_measure2" -> Map("total" -> "1", "complete" -> "0", "incomplete" -> "1")) |
| |
| runAndCheckResult(expectedMetrics) |
| } |
| |
| "profiling batch job" should "work" in { |
| dqApp = runApp("/_profiling-batch-griffindsl.json") |
| |
| val expectedMetrics = Map( |
| "column_details" -> Map( |
| "user_id" -> Map( |
| "avg_col_len" -> "5.0", |
| "max_col_len" -> "5", |
| "variance" -> "15.17", |
| "kurtosis" -> "-1.21", |
| "avg" -> "10007.0", |
| "min" -> "10001", |
| "null_count" -> "0", |
| "approx_distinct_count" -> "13", |
| "total" -> "13", |
| "std_dev" -> "3.89", |
| "data_type" -> "bigint", |
| "max" -> "10013", |
| "min_col_len" -> "5"), |
| "first_name" -> Map( |
| "avg_col_len" -> null, |
| "max_col_len" -> "6", |
| "variance" -> null, |
| "kurtosis" -> null, |
| "avg" -> null, |
| "min" -> null, |
| "null_count" -> "0", |
| "approx_distinct_count" -> "13", |
| "total" -> "13", |
| "std_dev" -> null, |
| "data_type" -> "string", |
| "max" -> null, |
| "min_col_len" -> "6"))) |
| |
| runAndCheckResult(Map("profiling_measure" -> expectedMetrics)) |
| } |
| |
| "batch job" should "fail with exception caught due to invalid rules" in { |
| assertThrows[org.apache.spark.sql.AnalysisException] { |
| runApp("/_profiling-batch-griffindsl_malformed.json") |
| } |
| } |
| |
| "batch job" should "fail with exception when no measures or rules are defined" in { |
| assertThrows[AssertionError] { |
| runApp("/_no_measure_or_rules_malformed.json") |
| } |
| } |
| } |