[GRIFFIN-365] Measure Enhancements and Stability fixes (#593)
* [GRIFFIN-365] Update pom.xml with scapegoat and other changes
* [GRIFFIN-365] Remove ban on elasticsearch-spark dependency
* [GRIFFIN-365] Measure enhancements
* [GRIFFIN-365] Fix test cases
* [GRIFFIN-365] Updates to documentation and fix for breaking tests
* [GRIFFIN-365] Revert elasticsearch changes
* Update schema_conformance.md
* Update sparksql.md
diff --git a/griffin-doc/measure/measure-configuration-guide/accuracy.md b/griffin-doc/measure/measure-configuration-guide/accuracy.md
index 95fe54b..f0ad0e5 100644
--- a/griffin-doc/measure/measure-configuration-guide/accuracy.md
+++ b/griffin-doc/measure/measure-configuration-guide/accuracy.md
@@ -242,4 +242,4 @@
A new column `__status` has been added to the original data set on which this measure was executed. The value of this
column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
-_Note:_ This output is for `ConsoleSink`.
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`.
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/completeness.md b/griffin-doc/measure/measure-configuration-guide/completeness.md
index accd5a5..e932156 100644
--- a/griffin-doc/measure/measure-configuration-guide/completeness.md
+++ b/griffin-doc/measure/measure-configuration-guide/completeness.md
@@ -31,9 +31,9 @@
users to define SQL-like expressions which describe their definition of completeness. For a tabular data set with
columns `name`, `email` and `age`, some examples of such completeness definitions are mentioned below,
-- `name is NULL`
-- `name is NULL and age is NULL`
-- `email NOT RLIKE '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'`
+- `name is NOT NULL`
+- `name is NOT NULL and age is NOT NULL`
+- `email RLIKE '^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$'`
### Configuration
@@ -49,7 +49,7 @@
"type": "completeness",
"data.source": "crime_report_source",
"config": {
- "expr": "zipcode is null OR city is null"
+ "expr": "zipcode is not null and city is not null"
},
"out": [
{
@@ -84,8 +84,8 @@
`config` object for completeness measure contains only one key `expr`. The value for `expr` is a SQL-like expression
string which definition this completeness. For more complex definitions, expressions can be clubbed with `AND` and `OR`.
-_Note:_ This expression describes the bad or incomplete records. This means that for `"expr": "zipcode is NULL"` the
-records which contain `null` in zipcode column are considered as incomplete.
+_Note:_ This expression describes the good or complete records. This means that for `"expr": "zipcode is NOT NULL"` the
+records which contain `null` in zipcode column are incomplete.
It can be defined as mentioned below,
@@ -94,7 +94,7 @@
...
"config": {
- "expr": "zipcode is null OR city is null"
+ "expr": "zipcode is NOT null AND city is NOT null"
}
...
@@ -136,7 +136,6 @@
{
"applicationId": "local-1623452412322",
"job_name": "Batch-All-Measures-Example",
- "tmst": 1623452423891,
"measure_name": "completeness_measure",
"metrics": [
{
@@ -209,4 +208,4 @@
A new column `__status` has been added to the original data set on which this measure was executed. The value of this
column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
-_Note:_ This output is for `ConsoleSink`.
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`.
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/duplication.md b/griffin-doc/measure/measure-configuration-guide/duplication.md
index 15e7408..71a79df 100644
--- a/griffin-doc/measure/measure-configuration-guide/duplication.md
+++ b/griffin-doc/measure/measure-configuration-guide/duplication.md
@@ -254,4 +254,4 @@
column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
These values for `__status` column are based on the value of user-defined key `bad.record.definition`.
-_Note:_ This output is for `ConsoleSink`.
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`.
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/profiling.md b/griffin-doc/measure/measure-configuration-guide/profiling.md
index b6ee6fa..5c7873b 100644
--- a/griffin-doc/measure/measure-configuration-guide/profiling.md
+++ b/griffin-doc/measure/measure-configuration-guide/profiling.md
@@ -207,4 +207,6 @@
```
_Note:_ Some mathematical metrics are bound to the type of attribute under consideration, for example standard deviation
-cannot be calculated for a column name of string type, thus, the value for these metrics are null for such columns.
\ No newline at end of file
+cannot be calculated for a column name of string type, thus, the value for these metrics are null for such columns.
+
+_Note:_ This output is for `ConsoleSink`.
\ No newline at end of file
diff --git a/griffin-doc/measure/measure-configuration-guide/schema_conformance.md b/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
index 768c0ca..2f47dde 100644
--- a/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
+++ b/griffin-doc/measure/measure-configuration-guide/schema_conformance.md
@@ -215,4 +215,4 @@
A new column `__status` has been added to the source data set that acted as input to this measure. The value of this
column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
-_Note:_ This output is for `ConsoleSink`.
\ No newline at end of file
+_Note:_ These outputs are for `ConsoleSink`.
diff --git a/griffin-doc/measure/measure-configuration-guide/sparksql.md b/griffin-doc/measure/measure-configuration-guide/sparksql.md
index d8ca39e..e82c743 100644
--- a/griffin-doc/measure/measure-configuration-guide/sparksql.md
+++ b/griffin-doc/measure/measure-configuration-guide/sparksql.md
@@ -211,7 +211,7 @@
column can be either `bad` or `good` which can be used to calculate the metrics/ separate data based on quality etc.
These values for `__status` column are based on the value of user-defined key `bad.record.definition`.
-_Note:_ This output is for `ConsoleSink`.
+_Note:_ These outputs are for `ConsoleSink`.
**Further Reading**
@@ -224,4 +224,4 @@
`bad.record.definition: "__measure_{measure_name}"` in the measure config.
For example if the user-defined name of the measure is `spark_sql_measure`, the alias column name
-becomes `__measure_spark_sql_measure`.
\ No newline at end of file
+becomes `__measure_spark_sql_measure`.
diff --git a/measure/pom.xml b/measure/pom.xml
index 3cd8bc6..6d97107 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -58,6 +58,10 @@
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
<mockito.version>3.2.2.0</mockito.version>
<scalatest.version>3.2.3</scalatest.version>
+ <maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
+ <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
+ <scalatest.maven.plugin.version>2.0.2</scalatest.maven.plugin.version>
+ <scapegoat.version>1.4.8</scapegoat.version>
</properties>
<dependencies>
@@ -68,6 +72,13 @@
<version>${scala.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.sksamuel.scapegoat</groupId>
+ <artifactId>scalac-scapegoat-plugin_${scala.version}</artifactId>
+ <version>${scapegoat.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Spark Dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
@@ -249,9 +260,13 @@
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-explaintypes</arg>
+ <arg>-P:scapegoat:dataDir:${project.build.directory}</arg>
+ <arg>-P:scapegoat:reports:scalastyle</arg>
+ <arg>
+ -P:scapegoat:overrideLevels:UnsafeTraversableMethods=Warning:OptionGet=Warning:TryGet=Warning
+ </arg>
</args>
<jvmArgs>
- <jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
<javacArgs>
@@ -260,19 +275,41 @@
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path,-try</javacArg>
</javacArgs>
+ <compilerPlugins>
+ <compilerPlugin>
+ <groupId>com.sksamuel.scapegoat</groupId>
+ <artifactId>scalac-scapegoat-plugin_${scala.version}</artifactId>
+ <version>${scapegoat.version}</version>
+ </compilerPlugin>
+ </compilerPlugins>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven.surefire.plugin.version}</version>
+ <configuration>
+ <skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
- <version>2.0.2</version>
+ <version>${scalatest.maven.plugin.version}</version>
<configuration>
<testFailureIgnore>false</testFailureIgnore>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>TestSuiteReport.txt</filereports>
+ <parallel>false</parallel>
+ <forkMode>once</forkMode>
<stderr/>
+ <argLine>
+ -ea -Xmx4g -Xss4m -XX:ReservedCodeCacheSize=1g -Dio.netty.tryReflectionSetAccessible=true
+ -Dfile.encoding=${project.build.sourceEncoding}
+ </argLine>
<systemProperties>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
</systemProperties>
@@ -297,18 +334,29 @@
<goals>
<goal>shade</goal>
</goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>griffin.org.apache.http</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.zaxxer</pattern>
+ <shadedPattern>griffin.com.zaxxer</shadedPattern>
+ </relocation>
+ </relocations>
+ <finalName>${project.build.finalName}-with-dependencies</finalName>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.griffin.measure.Application</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
</execution>
</executions>
- <configuration>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- <minimizeJar>true</minimizeJar>
- <relocations>
- <relocation>
- <pattern>org.apache.http</pattern>
- <shadedPattern>griffin.org.apache.http</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
</plugin>
<plugin>
@@ -324,7 +372,7 @@
</configuration>
<executions>
<execution>
- <phase>compile</phase>
+ <phase>validate</phase>
<goals>
<goal>format</goal>
</goals>
@@ -350,7 +398,7 @@
</configuration>
<executions>
<execution>
- <phase>compile</phase>
+ <phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
@@ -359,86 +407,18 @@
</plugin>
<plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- <version>0.8.2</version>
- <executions>
- <execution>
- <id>default-prepare-agent</id>
- <goals>
- <goal>prepare-agent</goal>
- </goals>
- </execution>
- <execution>
- <id>report</id>
- <phase>test</phase>
- <goals>
- <goal>report</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.scoverage</groupId>
<artifactId>scoverage-maven-plugin</artifactId>
<version>${scoverage.plugin.version}</version>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
- <aggregate>true</aggregate>
+ <aggregateOnly>true</aggregateOnly>
<highlighting>true</highlighting>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>3.2.0</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}</finalName>
- <classifier>spark-${spark.major.version}_scala-${scala.binary.version}</classifier>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.3.0</version>
- <configuration>
- <tarLongFileMode>posix</tarLongFileMode>
- <finalName>
- ${project.artifactId}-${project.version}-spark-${spark.major.version}_scala-${scala.binary.version}
- </finalName>
- <archive>
- <manifest>
- <mainClass>org.apache.griffin.measure.Application</mainClass>
- </manifest>
- </archive>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M2</version>
<executions>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 1765f4d..7c695c6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -89,7 +89,7 @@
assert(
repeatedMeasures.isEmpty,
- s"Measure names must be unique. " +
+ "Measure names must be unique. " +
s"Duplicate Measures names ['${repeatedMeasures.mkString("', '")}'] were found.")
val invalidMeasureSources = measures
@@ -100,7 +100,7 @@
assert(
invalidMeasureSources.isEmpty,
- s"Measure source(s) undefined." +
+ "Measure source(s) undefined." +
s" Unknown source(s) ['${invalidMeasureSources.mkString("', '")}'] were found.")
} else if (evaluateRule != null) {
evaluateRule.validate()
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index ab35fb6..6698726 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -75,7 +75,7 @@
}
def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
- val path: String = s"/_sql?format=csv"
+ val path: String = "/_sql?format=csv"
info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
val dfOpt =
try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
index 0caac17..cedcfda 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
@@ -98,10 +98,10 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- def impl(): (DataFrame, DataFrame)
+ def impl(dataSource: DataFrame): (DataFrame, DataFrame)
/**
- * Implementation should define validtion checks in this method (if required).
+ * Implementation should define validation checks in this method (if required).
* This method needs to be called explicitly call this method (preferably during measure creation).
*
* Defaults to no-op.
@@ -111,35 +111,18 @@
/**
* Executes this measure specific transformation on input data source.
*
- * @param batchId batch id to append in case of streaming source.
* @return enriched tuple of records dataframe and metric dataframe
*/
- def execute(batchId: Option[Long] = None): (DataFrame, DataFrame) = {
- val (recordsDf, metricDf) = impl()
+ def execute(dataSource: DataFrame): (DataFrame, DataFrame) = {
+ val (recordsDf, metricDf) = impl(dataSource)
val processedRecordDf = preProcessRecords(recordsDf)
val processedMetricDf = preProcessMetrics(metricDf)
- val res = batchId match {
- case Some(batchId) =>
- implicit val bId: Long = batchId
- (appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
- case None => (processedRecordDf, processedMetricDf)
- }
-
- res
+ (processedRecordDf, processedMetricDf)
}
- /**
- * Appends batch id to metrics in case of streaming sources.
- *
- * @param input metric dataframe
- * @param batchId batch id to append
- * @return updated metric dataframe
- */
- private def appendBatchIdIfAvailable(input: DataFrame)(implicit batchId: Long): DataFrame = {
- input.withColumn(BatchId, typedLit[Long](batchId))
- }
+ protected def nullToZero(column: Column): Column = when(column.isNull, 0).otherwise(column)
}
@@ -152,7 +135,8 @@
final val Expression = "expr"
final val MeasureColPrefix = "__measure"
final val Status = "__status"
- final val BatchId = "__batch_id"
+ final val BatchId = "batch_id"
+
final val MeasureName = "measure_name"
final val MeasureType = "measure_type"
final val MetricName = "metric_name"
@@ -164,6 +148,7 @@
final val Total: String = "total"
final val BadRecordDefinition = "bad.record.definition"
final val AllColumns: String = "*"
+ final val RowNumber: String = "__row_number"
final val emptyCol: Column = lit(StringUtils.EMPTY)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index a84dff0..71a2ef3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -18,13 +18,11 @@
package org.apache.griffin.measure.execution
import java.util.Date
-import java.util.concurrent.Executors
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
+import scala.concurrent.{ExecutionContextExecutorService, Future}
import scala.util._
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
@@ -55,7 +53,8 @@
*
* @param context Instance of `DQContext`
*/
-case class MeasureExecutor(context: DQContext) extends Loggable {
+case class MeasureExecutor(context: DQContext, ec: ExecutionContextExecutorService)
+ extends Loggable {
/**
* SparkSession for this Griffin Application.
@@ -69,19 +68,6 @@
.getBoolean("spark.griffin.measure.cacheDataSources", defaultValue = true)
/**
- * Size of thread pool for parallel measure execution.
- * Defaults to number of processors available to the spark driver JVM.
- */
- private val numThreads: Int = sparkSession.sparkContext.getConf
- .getInt("spark.griffin.measure.parallelism", Runtime.getRuntime.availableProcessors())
-
- /**
- * Service to handle threaded execution of tasks (measures).
- */
- private implicit val ec: ExecutionContextExecutorService =
- ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numThreads))
-
- /**
* Starting point of measure execution.
*
* @param measureParams Object representation(s) of user defined measure(s).
@@ -98,20 +84,18 @@
val dataSourceName = measuresForSource._1
val measureParams = measuresForSource._2
- withCacheIfNecessary(dataSourceName, {
- val dataSource = sparkSession.read.table(dataSourceName)
+ val dataSource = sparkSession.read.table(dataSourceName)
- if (dataSource.isStreaming) {
- // TODO this is a no op as streaming queries need to be registered.
+ if (dataSource.isStreaming) {
+ // TODO this is a no op as streaming queries need to be registered.
- dataSource.writeStream
- .foreachBatch((_, batchId) => {
- executeMeasures(measureParams, Some(batchId))
- })
- } else {
- executeMeasures(measureParams)
- }
- })
+ dataSource.writeStream
+ .foreachBatch((batchDf: Dataset[Row], batchId: Long) => {
+ executeMeasures(batchDf, dataSourceName, measureParams, batchId)
+ })
+ } else {
+ executeMeasures(dataSource, dataSourceName, measureParams)
+ }
})
}
@@ -121,25 +105,31 @@
* After the function is complete, the data source is uncached.
*
* @param dataSourceName name of data source
+ * @param numMeasures number of measures for each data source
* @param f function to perform
- * @param measureCountByDataSource number of measures for each data source
* @return
*/
- private def withCacheIfNecessary(dataSourceName: String, f: => Unit)(
- implicit measureCountByDataSource: Map[String, Int]): Unit = {
- val numMeasures = measureCountByDataSource(dataSourceName)
+ private def withCacheIfNecessary(
+ dataSourceName: String,
+ numMeasures: Int,
+ dataSource: DataFrame,
+ f: => Unit): Unit = {
var isCached = false
if (cacheDataSources && numMeasures > 1) {
- info(
- s"Caching data source with name '$dataSourceName' as $numMeasures measures are applied on it.")
- sparkSession.catalog.cacheTable(dataSourceName)
- isCached = true
+ if (!dataSource.isStreaming) {
+ info(
+ s"Caching data source with name '$dataSourceName' as $numMeasures measures are applied on it.")
+ dataSource.persist()
+ isCached = true
+ }
}
f
if (isCached) {
- sparkSession.catalog.uncacheTable(dataSourceName)
+ info(
+ s"Un-Caching data source with name '$dataSourceName' as measure execution is complete for it.")
+ dataSource.unpersist(true)
}
}
@@ -155,49 +145,74 @@
* @param batchId Option batch Id in case of streaming sources to identify micro batches.
*/
private def executeMeasures(
+ input: DataFrame,
+ dataSourceName: String,
measureParams: Seq[MeasureParam],
- batchId: Option[Long] = None): Unit = {
- val batchDetailsOpt = batchId.map(bId => s"for batch id $bId").getOrElse(StringUtils.EMPTY)
+ batchId: Long = -1L): Unit = {
+ val numMeasures: Int = measureParams.length
- // define the tasks
- val tasks: Map[String, Future[_]] = (for (i <- measureParams.indices)
- yield {
- val measureParam = measureParams(i)
- val measureName = measureParam.getName
+ withCacheIfNecessary(
+ dataSourceName,
+ numMeasures,
+ input, {
+ import java.util.concurrent.TimeUnit
- (measureName, Future {
- val currentContext = context.cloneDQContext(ContextId(new Date().getTime))
- info(s"Started execution of measure with name '$measureName'")
+ import scala.concurrent.duration.Duration
- val measure = createMeasure(measureParam)
- val (recordsDf, metricsDf) = measure.execute(batchId)
+ // define the tasks
+ val tasks = (for (i <- measureParams.indices)
+ yield {
+ val measureParam = measureParams(i)
+ val measure = createMeasure(measureParam)
+ val measureName = measureParam.getName
- persistMetrics(currentContext, measure, metricsDf)
- persistRecords(currentContext, measure, recordsDf)
+ (measure, Future {
+ info(s"Started execution of measure with name '$measureName'")
- MetricFlushStep(Some(measureParam)).execute(currentContext)
+ val (recordsDf, metricsDf) = measure.execute(input)
+ val currentContext = context.cloneDQContext(ContextId(new Date().getTime))
+
+ persistMetrics(currentContext, measure, metricsDf)
+ persistRecords(currentContext, measure, recordsDf)
+
+ MetricFlushStep(Some(measureParam)).execute(currentContext)
+ }(ec))
+ }).toMap
+
+ tasks.foreach(task => {
+ val measureName = task._1.measureParam.getName
+
+ task._2.onComplete {
+ case Success(_) =>
+ info(
+ s"Successfully executed measure with name '$measureName' on data source with name " +
+ s"'$dataSourceName")
+ case Failure(exception) =>
+ error(
+ s"Error occurred while executing measure with name '$measureName' on data source with name " +
+ s"'$dataSourceName'",
+ exception)
+ }(ec)
})
- }).toMap
- tasks.foreach(task =>
- task._2.onComplete {
- case Success(_) =>
- info(s"Successfully executed measure with name '${task._1}' $batchDetailsOpt")
- case Failure(e) =>
- error(s"Error executing measure with name '${task._1}' $batchDetailsOpt", e)
+ var deadline = Duration(10, TimeUnit.SECONDS).fromNow
+
+ while (!tasks.forall(_._2.isCompleted)) {
+ if (deadline.isOverdue()) {
+ val unfinishedMeasureNames = tasks
+ .filterNot(_._2.isCompleted)
+ .map(_._1.measureParam.getName)
+ .mkString("['", "', '", "']")
+
+ info(s"Measures with name $unfinishedMeasureNames are still executing.")
+ deadline = Duration(10, TimeUnit.SECONDS).fromNow
+ }
+ }
+
+ info(
+ "Completed execution of all measures for data source with " +
+ s"name '${measureParams.head.getDataSource}'.")
})
-
- Thread.sleep(1000)
-
- while (!tasks.forall(_._2.isCompleted)) {
- info(
- s"Measures with name ${tasks.filterNot(_._2.isCompleted).keys.mkString("['", "', '", "']")} " +
- s"are still executing.")
- Thread.sleep(1000)
- }
-
- info(
- s"Completed execution of all measures for data source with name '${measureParams.head.getDataSource}'.")
}
/**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index d33efc8..d9192f7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -17,16 +17,15 @@
package org.apache.griffin.measure.execution.impl
-import java.util.Locale
-
import io.netty.util.internal.StringUtil
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
-import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.CommonUtils.safeReduce
/**
* Accuracy Measure.
@@ -60,7 +59,7 @@
* @param sourceCol name of source column
* @param refCol name of reference column
*/
- case class AccuracyExpr(sourceCol: String, refCol: String)
+ final case class AccuracyExpr(sourceCol: String, refCol: String)
import AccuracyMeasure._
import Measure._
@@ -100,42 +99,47 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- override def impl(): (DataFrame, DataFrame) = {
- val originalSource = sparkSession.read.table(measureParam.getDataSource)
- val originalCols = originalSource.columns
+ override def impl(input: DataFrame): (DataFrame, DataFrame) = {
+ val originalCols = input.columns
- val dataSource = addColumnPrefix(originalSource, SourcePrefixStr)
+ val dataSource = addColumnPrefix(input, SourcePrefixStr)
val refDataSource =
- addColumnPrefix(sparkSession.read.table(refSource).drop(ConstantColumns.tmst), refPrefixStr)
+ addColumnPrefix(sparkSession.read.table(refSource), refPrefixStr)
- val accuracyExprs = exprOpt.get
+ val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+ val accuracyExprs = expr
.map(toAccuracyExpr)
.distinct
.map(x => AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))
- val joinExpr =
+ val joinExpr = safeReduce(
accuracyExprs
- .map(e => col(e.sourceCol) === col(e.refCol))
- .reduce(_ and _)
+ .map(e => col(e.sourceCol) === col(e.refCol)))(_ and _)
val indicatorExpr =
- accuracyExprs
- .map(e => coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol))
- .reduce(_ or _)
+ safeReduce(
+ accuracyExprs
+ .map(e =>
+ coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol)))(
+ _ or _)
- val nullExpr = accuracyExprs.map(e => col(e.sourceCol).isNull).reduce(_ or _)
+ val nullExpr = safeReduce(accuracyExprs.map(e => col(e.sourceCol).isNull))(_ or _)
+
+ val cols = accuracyExprs.map(_.refCol).map(col)
+ val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val recordsDf = removeColumnPrefix(
dataSource
- .join(refDataSource, joinExpr, "left")
+ .join(refDataSource.withColumn(RowNumber, row_number().over(window)), joinExpr, "left")
+ .where(col(RowNumber) === 1 or col(RowNumber).isNull)
.withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
SourcePrefixStr)
.select((originalCols :+ valueColumn).map(col): _*)
val selectCols =
Seq(Total, AccurateStr, InAccurateStr).map(e =>
- map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+ map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
val metricDf = recordsDf
@@ -165,8 +169,8 @@
* Validates if the expression is not null and non empty along with some dataset specific validations.
*/
override def validate(): Unit = {
- assert(exprOpt.isDefined, s"'$Expression' must be defined.")
- assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
+ val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+ assert(expr.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
assert(
!StringUtil.isNullOrEmpty(refSource),
@@ -178,12 +182,10 @@
val datasourceName = measureParam.getDataSource
- val dataSourceCols =
- sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
- val refDataSourceCols =
- sparkSession.read.table(refSource).columns.map(_.toLowerCase(Locale.ROOT)).toSet
+ val dataSourceCols = sparkSession.read.table(datasourceName).columns.toSet
+ val refDataSourceCols = sparkSession.read.table(refSource).columns.toSet
- val accuracyExpr = exprOpt.get.map(toAccuracyExpr).distinct
+ val accuracyExpr = expr.map(toAccuracyExpr).distinct
val (forDataSource, forRefDataSource) =
accuracyExpr
.map(
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index fbbadc6..fabe166 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -39,15 +39,10 @@
case class CompletenessMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
+ import CompletenessMeasure._
import Measure._
/**
- * Completeness Constants
- */
- final val Complete: String = "complete"
- final val InComplete: String = "incomplete"
-
- /**
* Completeness measure supports record and metric write
*/
override val supportsRecordWrite: Boolean = true
@@ -80,16 +75,15 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- override def impl(): (DataFrame, DataFrame) = {
- val exprStr = exprOpt.get
+ override def impl(input: DataFrame): (DataFrame, DataFrame) = {
+ val exprStr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
val selectCols =
Seq(Total, Complete, InComplete).map(e =>
- map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+ map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
- val input = sparkSession.read.table(measureParam.getDataSource)
- val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 1).otherwise(0))
+ val badRecordsDf = input.withColumn(valueColumn, when(expr(exprStr), 0).otherwise(1))
val metricDf = badRecordsDf
.withColumn(Total, lit(1))
@@ -104,9 +98,16 @@
* Validates if expression is defined and is non empty.
*/
override def validate(): Unit = {
- assert(exprOpt.isDefined, s"'$Expression' must be defined.")
- assert(exprOpt.nonEmpty, s"'$Expression' must not be empty.")
-
- assert(!StringUtil.isNullOrEmpty(exprOpt.get), s"'$Expression' must not be null or empty.")
+ val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+ assert(!StringUtil.isNullOrEmpty(expr), s"'$Expression' must not be null or empty.")
}
}
+
+object CompletenessMeasure {
+
+ /**
+ * Completeness Constants
+ */
+ final val Complete: String = "complete"
+ final val InComplete: String = "incomplete"
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
index 359a80e..12588cd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasure.scala
@@ -26,6 +26,7 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.utils.CommonUtils.safeReduce
/**
* Duplication Measure.
@@ -95,38 +96,33 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- override def impl(): (DataFrame, DataFrame) = {
- val input = sparkSession.read.table(measureParam.getDataSource)
+ override def impl(input: DataFrame): (DataFrame, DataFrame) = {
val cols = keyCols(input).map(col)
- val isNullCol = cols.map(x => x.isNull).reduce(_ and _)
- val duplicateCol = when(col(__Temp) > 1, 1).otherwise(0)
- val uniqueCol = when(not(isNullCol) and col(Unique) === 1, 1).otherwise(0)
- val distinctCol =
- when(not(isNullCol) and (col(Unique) === 1 or col(NonUnique) === 1), 1).otherwise(0)
- val nonUniqueCol =
- when(not(isNullCol) and col(Unique) =!= 1 and (col(__Temp) - col(NonUnique) === 0), 1)
- .otherwise(0)
+ val isNullCol = safeReduce(cols.map(x => x.isNull))(_ and _)
+ val uniqueCol = condition(col(Count) === 1)
+ val nonUniqueCol = condition(col(Count) > 1 and col(RowNumber) === 1)
+ val duplicateCol = condition(col(Count) > 1 and col(RowNumber) > 1)
+ val distinctCol = condition(col(Unique) === 1 or col(NonUnique) === 1)
val window = Window.partitionBy(cols: _*).orderBy(cols: _*)
val aggDf = input
- .select(col(AllColumns), row_number().over(window).as(__Temp))
- .withColumn(IsNull, isNullCol)
- .withColumn(Duplicate, duplicateCol)
- .withColumn(Unique, count(lit(1)).over(window))
+ .withColumn(IsNotNull, not(isNullCol))
+ .withColumn(RowNumber, row_number().over(window))
+ .withColumn(Count, count(lit(1)).over(window))
.withColumn(Unique, uniqueCol)
- .withColumn(NonUnique, min(__Temp).over(window))
.withColumn(NonUnique, nonUniqueCol)
+ .withColumn(Duplicate, duplicateCol)
.withColumn(Distinct, distinctCol)
.withColumn(Total, lit(1))
.withColumn(valueColumn, col(badnessExpr))
- .drop(__Temp, IsNull)
+ .drop(IsNotNull, RowNumber, Count)
val metricAggCols = duplicationMeasures.map(m => sum(m).as(m))
val selectCols = duplicationMeasures.map(e =>
- map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+ map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
val metricDf = aggDf
@@ -160,6 +156,11 @@
}, s"Invalid value '$badnessExpr' was provided for $BadRecordDefinition")
}
+ private def condition(c: Column, checkNotNull: Boolean = true): Column = {
+ val notNullExpr = if (checkNotNull) col(IsNotNull) else lit(true)
+ when(notNullExpr and c, 1).otherwise(0)
+ }
+
private def keyCols(input: DataFrame): Array[String] = {
if (StringUtil.isNullOrEmpty(exprs)) input.columns
else exprs.split(",").map(_.trim)
@@ -171,10 +172,11 @@
* Duplication measure constants
*/
object DuplicationMeasure {
- final val IsNull: String = "is_null"
final val Duplicate: String = "duplicate"
final val Unique: String = "unique"
final val NonUnique: String = "non_unique"
final val Distinct: String = "distinct"
- final val __Temp: String = "__temp"
+
+ final val IsNotNull: String = "is_not_null"
+ final val Count: String = "count"
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
index 52d8bff..fedfe37 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
@@ -17,8 +17,7 @@
package org.apache.griffin.measure.execution.impl
-import java.util.Locale
-
+import io.netty.util.internal.StringUtil
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
@@ -26,7 +25,6 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
import org.apache.griffin.measure.execution.Measure._
-import org.apache.griffin.measure.step.builder.ConstantColumns
/**
* Profiling measure.
@@ -71,12 +69,12 @@
* profiling measure is to be executed. `expr` is an optional key for Profiling measure,
* i.e., if it is not defined, all columns in the data set will be profiled.
*/
- val exprOpt: Option[String] = Option(getFromConfig[String](Expression, null))
+ val exprs: String = getFromConfig[String](Expression, null)
/**
- * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
- * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
- * to `false`, then the counts will be 100% accurate.
+ * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
+ * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
+ * floating-point metric values will be rounded to 2 decimal places.
*/
val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
@@ -91,13 +89,15 @@
val dataSetSample: Double = getFromConfig[java.lang.Double](DataSetSampleStr, 1.0)
/**
- * Several resultant metrics of profiling measure are floating-point numbers. This key controls to extent
- * to which these floating-point numbers are rounded. For example, if `round.scale = 2` then all
- * floating-point metric values will be rounded to 2 decimal places.
+ * The value for this key is boolean. If this is `true`, the distinct counts will be approximated
+ * to allow up to 5% error. Approximate counts are usually faster by are less accurate. If this is set
+ * to `false`, then the counts will be 100% accurate.
*/
val approxDistinctCount: Boolean =
getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
+ validate()
+
/**
* Various metrics are calculated for columns of the data set. If expr is correctly defined,
* then metrics are generated for only the given subset of columns else, its generated for all columns.
@@ -118,26 +118,17 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- override def impl(): (DataFrame, DataFrame) = {
+ override def impl(dataSource: DataFrame): (DataFrame, DataFrame) = {
info(s"Selecting random ${dataSetSample * 100}% of the rows for profiling.")
- val input = sparkSession.read.table(measureParam.getDataSource).sample(dataSetSample)
- val profilingColNames = exprOpt
- .getOrElse(input.columns.mkString(","))
- .split(",")
- .map(_.trim.toLowerCase(Locale.ROOT))
- .toSet
+ val input = dataSource.sample(dataSetSample)
+ val profilingColNames = keyCols(input)
- val profilingCols =
- input.schema.fields.filter(f =>
- profilingColNames.contains(f.name) && !f.name.equalsIgnoreCase(ConstantColumns.tmst))
-
- assert(
- profilingCols.nonEmpty,
- s"Invalid columns [${profilingCols.map(_.name).mkString(", ")}] were provided for profiling.")
+ val profilingCols = input.schema.fields.filter(f => profilingColNames.contains(f.name))
val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
val colName = field.name
- val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)
+ val profilingExprs =
+ getProfilingExprs(field, roundScale, approxDistinctCount, dataSetSample).map(nullToZero)
exprList.:+(map(profilingExprs: _*).as(s"$DetailsPrefix$colName"))
})
@@ -165,6 +156,24 @@
(sparkSession.emptyDataFrame, metricDf)
}
+ override def validate(): Unit = {
+ val input = sparkSession.read.table(measureParam.getDataSource)
+ val kc = keyCols(input)
+
+ assert(kc.nonEmpty, s"Columns defined in '$Expression' is empty.")
+ kc.foreach(c =>
+ assert(input.columns.contains(c), s"Provided column '$c' does not exist in the dataset."))
+
+ assert(
+ dataSetSample > 0.0d && dataSetSample <= 1.0d,
+ "Sample fraction of rows must be in range [0.0, 1.0].")
+ }
+
+ private def keyCols(input: DataFrame): Array[String] = {
+ if (StringUtil.isNullOrEmpty(exprs)) input.columns
+ else exprs.split(",").map(_.trim)
+ }.distinct
+
}
/**
@@ -227,7 +236,8 @@
private def getProfilingExprs(
field: StructField,
roundScale: Int,
- approxDistinctCount: Boolean): Seq[Column] = {
+ approxDistinctCount: Boolean,
+ dataSetSample: Double): Seq[Column] = {
val colName = field.name
val colType = field.dataType
@@ -243,12 +253,16 @@
(lit(DistinctCount), countDistinct(column).as(DistinctCount))
}
+ val distinctExpr = if (dataSetSample == 1) {
+ Seq(lit(distinctCountName), distinctCountExpr)
+ } else Nil
+
Seq(
Seq(lit(DataTypeStr), lit(colType.catalogString).as(DataTypeStr)),
Seq(lit(Total), sum(lit(1)).as(Total)),
Seq(lit(MinColLength), min(lengthColExpr).as(MinColLength)),
Seq(lit(MaxColLength), max(lengthColExpr).as(MaxColLength)),
- Seq(lit(AvgColLength), forNumericFn(colType, avg(lengthColExpr), AvgColLength)),
+ Seq(lit(AvgColLength), avg(lengthColExpr).as(AvgColLength)),
Seq(lit(Min), forNumericFn(colType, min(column), Min)),
Seq(lit(Max), forNumericFn(colType, max(column), Max)),
Seq(lit(Avg), forNumericFn(colType, bround(avg(column), roundScale), Avg)),
@@ -257,7 +271,7 @@
forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation)),
Seq(lit(Variance), forNumericFn(colType, bround(variance(column), roundScale), Variance)),
Seq(lit(Kurtosis), forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis)),
- Seq(lit(distinctCountName), distinctCountExpr),
+ distinctExpr,
Seq(lit(NullCount), sum(nullColExpr).as(NullCount))).flatten
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
index 2111d6a..3247b9b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasure.scala
@@ -17,14 +17,13 @@
package org.apache.griffin.measure.execution.impl
-import java.util.Locale
-
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.utils.CommonUtils.safeReduce
/**
* SchemaConformance Measure.
@@ -43,6 +42,7 @@
case class SchemaConformanceMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
+ import CompletenessMeasure._
import Measure._
/**
@@ -51,15 +51,13 @@
* @param sourceCol name of source column
* @param dataType name of reference column
*/
- case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
+ final case class SchemaConformanceExpr(sourceCol: String, dataType: DataType)
/**
* SchemaConformance Constants
*/
final val SourceColStr: String = "source.col"
final val DataTypeStr: String = "type"
- final val Complete: String = "complete"
- final val InComplete: String = "incomplete"
/**
* SchemaConformance measure supports record and metric write
@@ -90,19 +88,21 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- override def impl(): (DataFrame, DataFrame) = {
- val givenExprs = exprOpt.get.map(toSchemaConformanceExpr).distinct
+ override def impl(input: DataFrame): (DataFrame, DataFrame) = {
+ val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+ val givenExprs = expr.map(toSchemaConformanceExpr).distinct
- val incompleteExpr = givenExprs
- .map(e => when(col(e.sourceCol).cast(e.dataType).isNull, true).otherwise(false))
- .reduce(_ or _)
+ val incompleteExpr = safeReduce(
+ givenExprs
+ .map(e =>
+ when(col(e.sourceCol).cast(StringType).cast(e.dataType).isNull, true)
+ .otherwise(false)))(_ or _)
val selectCols =
Seq(Total, Complete, InComplete).map(e =>
- map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+ map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
- val input = sparkSession.read.table(measureParam.getDataSource)
val badRecordsDf = input.withColumn(valueColumn, when(incompleteExpr, 1).otherwise(0))
val metricDf = badRecordsDf
@@ -132,14 +132,13 @@
* Validates if the expression is not null and non empty along with some dataset specific validations.
*/
override def validate(): Unit = {
- assert(exprOpt.isDefined, s"'$Expression' must be defined.")
- assert(exprOpt.get.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
+ val expr = exprOpt.getOrElse(throw new AssertionError(s"'$Expression' must be defined."))
+ assert(expr.flatten.nonEmpty, s"'$Expression' must not be empty or of invalid type.")
val datasourceName = measureParam.getDataSource
- val dataSourceCols =
- sparkSession.read.table(datasourceName).columns.map(_.toLowerCase(Locale.ROOT)).toSet
- val schemaConformanceExprExpr = exprOpt.get.map(toSchemaConformanceExpr).distinct
+ val dataSourceCols = sparkSession.read.table(datasourceName).columns.toSet
+ val schemaConformanceExprExpr = expr.map(toSchemaConformanceExpr).distinct
val forDataSource =
schemaConformanceExprExpr.map(e => (e.sourceCol, dataSourceCols.contains(e.sourceCol)))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
index 0cdc090..2e79257 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/SparkSQLMeasure.scala
@@ -43,15 +43,10 @@
case class SparkSQLMeasure(sparkSession: SparkSession, measureParam: MeasureParam)
extends Measure {
+ import CompletenessMeasure._
import Measure._
/**
- * SparkSQL measure constants
- */
- final val Complete: String = "complete"
- final val InComplete: String = "incomplete"
-
- /**
* SparkSQL measure supports record and metric write
*/
override val supportsRecordWrite: Boolean = true
@@ -87,8 +82,8 @@
*
* @return tuple of records dataframe and metric dataframe
*/
- override def impl(): (DataFrame, DataFrame) = {
- val df = sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
+ override def impl(dataSource: DataFrame): (DataFrame, DataFrame) = {
+ val df = dataSource.sparkSession.sql(expr).withColumn(valueColumn, sparkExpr(badnessExpr))
assert(
df.schema.exists(f => f.name.matches(valueColumn) && f.dataType.isInstanceOf[BooleanType]),
@@ -96,7 +91,7 @@
val selectCols =
Seq(Total, Complete, InComplete).map(e =>
- map(lit(MetricName), lit(e), lit(MetricValue), col(e).cast(StringType)))
+ map(lit(MetricName), lit(e), lit(MetricValue), nullToZero(col(e).cast(StringType))))
val metricColumn: Column = array(selectCols: _*).as(valueColumn)
val badRecordsDf = df.withColumn(valueColumn, when(col(valueColumn), 1).otherwise(0))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 48231ac..6d6200a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -17,8 +17,9 @@
package org.apache.griffin.measure.launch.batch
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Executors, TimeUnit}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.util.Try
import org.apache.spark.SparkConf
@@ -85,7 +86,18 @@
if (dqParam.getMeasures != null && dqParam.getMeasures.nonEmpty) {
Try {
- MeasureExecutor(dqContext).execute(dqParam.getMeasures)
+ // Size of thread pool for parallel measure execution.
+ // Defaults to number of processors available to the spark driver JVM.
+ val numThreads: Int = sparkSession.sparkContext.getConf
+ .getInt(
+ "spark.griffin.measure.parallelism",
+ Runtime.getRuntime.availableProcessors())
+
+ // Service to handle threaded execution of tasks (measures).
+ val ec: ExecutionContextExecutorService =
+ ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numThreads))
+
+ MeasureExecutor(dqContext, ec).execute(dqParam.getMeasures)
true
}
} else {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
index 268b3a0..08bfb9b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
@@ -41,4 +41,17 @@
result
}
+
+ /**
+ *
+ * @param t `Traversable`
+ * @param op operation
+ * @tparam T1 any type
+ * @return instance of `T1`
+ */
+ def safeReduce[T1](t: Iterable[T1])(op: (T1, T1) => T1): T1 = {
+ val head :: tail = t.toList
+ tail.fold(head)(op)
+ }
+
}
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index 2a7f6a4..9f0f748 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -21,7 +21,7 @@
"type": "completeness",
"data.source": "source",
"config": {
- "expr": "email is null or post_code is null or first_name is null"
+ "expr": "email is not null and post_code is not null and first_name is not null"
},
"out": [
{
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
index 2f81d1c..206761f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
@@ -136,7 +136,7 @@
it should "execute defined measure expr" in {
val measure = AccuracyMeasure(spark, param)
- val (recordsDf, metricsDf) = measure.execute(None)
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(recordDfSchema)(recordsDf.schema)
assertResult(metricDfSchema)(metricsDf.schema)
@@ -155,4 +155,27 @@
assertResult(metricMap(InAccurateStr))("3")
}
+ it should "support space in column name" in {
+ val newSource = source.withColumnRenamed("name", "first name")
+ newSource.createOrReplaceTempView("newSource")
+ val measure = AccuracyMeasure(spark, param)
+ val (recordsDf, metricsDf) = measure.execute(newSource)
+
+ assertResult(newSource.schema.add(Status, "string", nullable = false))(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
+
+ assertResult(newSource.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
+
+ val metricMap = metricsDf
+ .head()
+ .getAs[Seq[Map[String, String]]](Metrics)
+ .map(x => x(MetricName) -> x(MetricValue))
+ .toMap
+
+ assertResult(metricMap(Total))("5")
+ assertResult(metricMap(AccurateStr))("2")
+ assertResult(metricMap(InAccurateStr))("3")
+ }
+
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
index d6e8b74..db0127c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -23,13 +23,14 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.CompletenessMeasure._
class CompletenessMeasureTest extends MeasureTest {
var param: MeasureParam = _
override def beforeAll(): Unit = {
super.beforeAll()
- param = MeasureParam("param", "Completeness", "source", Map(Expression -> "name is null"))
+ param = MeasureParam("param", "Completeness", "source", Map(Expression -> "name is not null"))
}
"CompletenessMeasure" should "validate expression config" in {
@@ -61,8 +62,8 @@
}
it should "execute defined measure expr" in {
- val measure = CompletenessMeasure(spark, param)
- val (recordsDf, metricsDf) = measure.execute(None)
+ val measure = new CompletenessMeasure(spark, param)
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(recordDfSchema)(recordsDf.schema)
assertResult(metricDfSchema)(metricsDf.schema)
@@ -77,15 +78,15 @@
.toMap
assertResult(metricMap(Total))("5")
- assertResult(metricMap(measure.Complete))("4")
- assertResult(metricMap(measure.InComplete))("1")
+ assertResult(metricMap(Complete))("4")
+ assertResult(metricMap(InComplete))("1")
}
it should "supported complex measure expr" in {
- val measure = CompletenessMeasure(
+ val measure = new CompletenessMeasure(
spark,
- param.copy(config = Map(Expression -> "name is null or gender is null")))
- val (recordsDf, metricsDf) = measure.execute(None)
+ param.copy(config = Map(Expression -> "name is not null and gender is not null")))
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(recordDfSchema)(recordsDf.schema)
assertResult(metricDfSchema)(metricsDf.schema)
@@ -100,19 +101,19 @@
.toMap
assertResult(metricMap(Total))("5")
- assertResult(metricMap(measure.Complete))("3")
- assertResult(metricMap(measure.InComplete))("2")
+ assertResult(metricMap(Complete))("3")
+ assertResult(metricMap(InComplete))("2")
}
it should "throw runtime error for invalid expr" in {
assertThrows[AnalysisException] {
- CompletenessMeasure(spark, param.copy(config = Map(Expression -> "xyz is null")))
- .execute()
+ new CompletenessMeasure(spark, param.copy(config = Map(Expression -> "xyz is null")))
+ .execute(source)
}
assertThrows[ParseException] {
- CompletenessMeasure(spark, param.copy(config = Map(Expression -> "select 1")))
- .execute()
+ new CompletenessMeasure(spark, param.copy(config = Map(Expression -> "select 1")))
+ .execute(source)
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index 7e22617..f807434 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -107,7 +107,7 @@
it should "execute defined measure expr" in {
val measure =
DuplicationMeasure(spark, param.copy(config = Map(BadRecordDefinition -> "duplicate")))
- val (recordsDf, metricsDf) = measure.execute(None)
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(recordDfSchema)(recordsDf.schema)
assertResult(metricDfSchema)(metricsDf.schema)
@@ -128,11 +128,11 @@
assertResult(metricMap(Total))("5")
}
- it should "supported complex measure expr" in {
+ it should "support complex measure expr" in {
val measure = DuplicationMeasure(
spark,
param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "duplicate")))
- val (recordsDf, metricsDf) = measure.execute(None)
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(recordDfSchema)(recordsDf.schema)
assertResult(metricDfSchema)(metricsDf.schema)
@@ -153,4 +153,34 @@
assertResult(metricMap(Total))("5")
}
+ it should "support columns with space in their names" in {
+ val newSource = source.withColumnRenamed("name", "full name")
+ newSource.createOrReplaceTempView("newSource")
+ val measure = DuplicationMeasure(
+ spark,
+ param.copy(
+ dataSource = "newSource",
+ config = Map(Expression -> "full name", BadRecordDefinition -> "duplicate")))
+
+ val (recordsDf, metricsDf) = measure.execute(newSource)
+
+ assertResult(newSource.schema.add(Status, "string", nullable = false))(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
+
+ assertResult(newSource.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
+
+ val metricMap = metricsDf
+ .head()
+ .getAs[Seq[Map[String, String]]](Metrics)
+ .map(x => x(MetricName) -> x(MetricValue))
+ .toMap
+
+ assertResult(metricMap(Duplicate))("1")
+ assertResult(metricMap(Unique))("2")
+ assertResult(metricMap(NonUnique))("1")
+ assertResult(metricMap(Distinct))("3")
+ assertResult(metricMap(Total))("5")
+ }
+
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
index e4f1b21..d32730f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
@@ -70,7 +70,7 @@
assertResult(3)(measure.roundScale)
assertResult(true)(measure.approxDistinctCount)
- val (_, metricsDf) = measure.execute(None)
+ val (_, metricsDf) = measure.execute(source)
assertResult(1L)(metricsDf.count())
@@ -86,7 +86,7 @@
assertResult(3)(measure.roundScale)
assertResult(false)(measure.approxDistinctCount)
- val (_, metricsDf) = measure.execute(None)
+ val (_, metricsDf) = measure.execute(source)
assertResult(1L)(metricsDf.count())
@@ -97,7 +97,7 @@
it should "throw runtime error for invalid expr" in {
assertThrows[AssertionError] {
ProfilingMeasure(spark, param.copy(config = Map(Expression -> "xyz")))
- .execute()
+ .execute(source)
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
index e705675..6c37f34 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SchemaConformanceMeasureTest.scala
@@ -18,9 +18,11 @@
package org.apache.griffin.measure.execution.impl
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.functions.lit
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.CompletenessMeasure._
class SchemaConformanceMeasureTest extends MeasureTest {
var param: MeasureParam = _
@@ -102,7 +104,7 @@
it should "execute defined measure expr" in {
val measure = SchemaConformanceMeasure(spark, param)
- val (recordsDf, metricsDf) = measure.execute(None)
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(recordDfSchema)(recordsDf.schema)
assertResult(metricDfSchema)(metricsDf.schema)
@@ -117,8 +119,65 @@
.toMap
assertResult(metricMap(Total))("5")
- assertResult(metricMap(measure.Complete))("5")
- assertResult(metricMap(measure.InComplete))("0")
+ assertResult(metricMap(Complete))("5")
+ assertResult(metricMap(InComplete))("0")
+ }
+
+ it should "be able to check for date column" in {
+ val newSource = source.withColumn("Test_date", lit("2009-07-30"))
+ newSource.createOrReplaceTempView("newSource")
+ val measure = SchemaConformanceMeasure(
+ spark,
+ param.copy(
+ dataSource = "newSource",
+ config = Map(Expression -> Seq(Map(SourceColStr -> "Test_date", DataTypeStr -> "date")))))
+ val (recordsDf, metricsDf) = measure.execute(newSource)
+
+ val newRecordDfSchema = newSource.schema.add(Status, "string", nullable = false)
+ assertResult(newRecordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
+
+ assertResult(newSource.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
+
+ val metricMap = metricsDf
+ .head()
+ .getAs[Seq[Map[String, String]]](Metrics)
+ .map(x => x(MetricName) -> x(MetricValue))
+ .toMap
+
+ assertResult(metricMap(Total))("5")
+ assertResult(metricMap(Complete))("5")
+ assertResult(metricMap(InComplete))("0")
+ }
+
+ it should "be able to check for timestamp column and support space in column name" in {
+ val newSource = source.withColumn("Test date", lit("2009-07-30"))
+ newSource.createOrReplaceTempView("newSource")
+ val measure = SchemaConformanceMeasure(
+ spark,
+ param.copy(
+ dataSource = "newSource",
+ config =
+ Map(Expression -> Seq(Map(SourceColStr -> "Test date", DataTypeStr -> "timestamp")))))
+ val (recordsDf, metricsDf) = measure.execute(newSource)
+
+ val newRecordDfSchema = newSource.schema.add(Status, "string", nullable = false)
+ assertResult(newRecordDfSchema)(recordsDf.schema)
+ assertResult(metricDfSchema)(metricsDf.schema)
+
+ assertResult(newSource.count())(recordsDf.count())
+ assertResult(1L)(metricsDf.count())
+
+ val metricMap = metricsDf
+ .head()
+ .getAs[Seq[Map[String, String]]](Metrics)
+ .map(x => x(MetricName) -> x(MetricValue))
+ .toMap
+
+ assertResult(metricMap(Total))("5")
+ assertResult(metricMap(Complete))("5")
+ assertResult(metricMap(InComplete))("0")
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
index 405e4a6..a2986e0 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
@@ -22,6 +22,7 @@
import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
import org.apache.griffin.measure.execution.Measure._
+import org.apache.griffin.measure.execution.impl.CompletenessMeasure._
class SparkSqlMeasureTest extends MeasureTest {
var param: MeasureParam = _
@@ -99,7 +100,7 @@
it should "execute defined measure expr" in {
val measure = SparkSQLMeasure(spark, param)
- val (recordsDf, metricsDf) = measure.execute(None)
+ val (recordsDf, metricsDf) = measure.execute(source)
assertResult(metricDfSchema)(metricsDf.schema)
assertResult(source.count())(recordsDf.count())
@@ -112,8 +113,8 @@
.toMap
assertResult(metricMap(Total))("5")
- assertResult(metricMap(measure.Complete))("3")
- assertResult(metricMap(measure.InComplete))("2")
+ assertResult(metricMap(Complete))("3")
+ assertResult(metricMap(InComplete))("2")
}
it should "throw runtime error for invalid expr" in {
@@ -122,7 +123,7 @@
spark,
param.copy(config =
Map(Expression -> "select * from source", BadRecordDefinition -> "name")))
- .execute()
+ .execute(source)
}
assertThrows[AnalysisException] {
@@ -130,7 +131,7 @@
spark,
param.copy(config =
Map(Expression -> "select 1 as my_value", BadRecordDefinition -> "name is null")))
- .execute()
+ .execute(source)
}
}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index b4f11cd..c7d002b 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -147,18 +147,18 @@
"max" -> "10013",
"min_col_len" -> "5"),
"first_name" -> Map(
- "avg_col_len" -> null,
+ "avg_col_len" -> "6.0",
"max_col_len" -> "6",
- "variance" -> null,
- "kurtosis" -> null,
- "avg" -> null,
- "min" -> null,
+ "variance" -> "0",
+ "kurtosis" -> "0",
+ "avg" -> "0",
+ "min" -> "0",
"null_count" -> "0",
"approx_distinct_count" -> "13",
"total" -> "13",
- "std_dev" -> null,
+ "std_dev" -> "0",
"data_type" -> "string",
- "max" -> null,
+ "max" -> "0",
"min_col_len" -> "6"))
runAndCheckResult(Map("profiling_measure" -> expectedMetrics))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index e24bcd5..5e374c9 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -17,7 +17,7 @@
package org.apache.griffin.measure.job
-import scala.util.{Failure, Success}
+import scala.util._
import org.scalatest.matchers.should._
@@ -38,11 +38,9 @@
var dqApp: DQApp = _
def getConfigFilePath(fileName: String): String = {
- try {
- getClass.getResource(fileName).getFile
- } catch {
- case _: NullPointerException => throw new Exception(s"resource [$fileName] not found")
- case ex: Throwable => throw ex
+ Try(getClass.getResource(fileName).getFile).toOption match {
+ case Some(n) => n
+ case None => throw new Exception(s"resource [$fileName] not found")
}
}
diff --git a/pom.xml b/pom.xml
index e51c94e..d2d4ffe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<scala211.binary.version>2.11</scala211.binary.version>
- <scala.version>${scala.binary.version}.0</scala.version>
+ <scala.version>${scala.binary.version}.12</scala.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>