[GRIFFIN-358] New Profiling Measure
diff --git a/measure/src/main/resources/config-batch-preproc.json b/measure/src/main/resources/config-batch-preproc.json
index 6de4b4f..c895241 100644
--- a/measure/src/main/resources/config-batch-preproc.json
+++ b/measure/src/main/resources/config-batch-preproc.json
@@ -15,7 +15,7 @@
},
"pre.proc": [
"select split(value, ',') as part from this",
- "select part[0] as user_id, part[1] as first_name, part[2] as last_name, part[3] as address, part[4] as email, part[5] as phone, part[6] as post_code from this"
+ "select cast(part[0] as long) as user_id, part[1] as first_name, part[2] as last_name, part[3] as address, part[4] as email, part[5] as phone, part[6] as post_code from this"
]
}
}
@@ -35,6 +35,26 @@
"flatten": "map"
}
]
+ },
+ {
+ "name": "profiling_measure",
+ "type": "profiling",
+ "data.source": "source",
+ "config": {
+ "expr": [
+ "user_id",
+ "post_code"
+ ],
+ "approx.distinct.count": true,
+ "round.scale": 2
+ },
+ "out": [
+ {
+ "type": "metric",
+ "name": "prof_metric",
+ "flatten": "default"
+ }
+ ]
}
],
"sinks": [
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
index ee2bd28..8d73e1b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/MeasureTypes.scala
@@ -25,7 +25,7 @@
type MeasureType = Value
- val Completeness, SparkSQL = Value
+ val Completeness, Profiling, SparkSQL = Value
override def withNameWithDefault(name: String): MeasureType = {
super.withNameWithDefault(name)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
index c503d6a..30b1738 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
@@ -52,4 +52,6 @@
}
}
+ def clear(): Unit = metrics.clear()
+
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
index c8d4149..a327f0e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/Measure.scala
@@ -37,27 +37,31 @@
val supportsRecordWrite: Boolean
val supportsMetricWrite: Boolean
- final val measureViolationsColName = s"${MeasureColPrefix}_${measureParam.getName}"
+ final val valueColumn = s"${MeasureColPrefix}_${measureParam.getName}"
def getFromConfig[T: ClassTag](key: String, defValue: T): T = {
measureParam.getConfig.getAnyRef[T](key, defValue)
}
def preProcessMetrics(input: DataFrame): DataFrame = {
- val measureType = measureParam.getType.toString.toLowerCase(Locale.ROOT)
+ if (supportsMetricWrite) {
+ val measureType = measureParam.getType.toString.toLowerCase(Locale.ROOT)
- input
- .withColumn(MeasureName, typedLit[String](measureParam.getName))
- .withColumn(MeasureType, typedLit[String](measureType))
- .withColumn("value", col(measureViolationsColName))
- .withColumn("data_source", typedLit[String](measureParam.getDataSource))
- .select(MeasureName, MeasureType, "data_source", "value")
+ input
+ .withColumn(MeasureName, typedLit[String](measureParam.getName))
+ .withColumn(MeasureType, typedLit[String](measureType))
+ .withColumn("value", col(valueColumn))
+ .withColumn("data_source", typedLit[String](measureParam.getDataSource))
+ .select(MeasureName, MeasureType, "data_source", "value")
+ } else input
}
def preProcessRecords(input: DataFrame): DataFrame = {
- input
- .withColumn(Status, when(col(measureViolationsColName) === 0, "good").otherwise("bad"))
- .drop(measureViolationsColName)
+ if (supportsRecordWrite) {
+ input
+ .withColumn(Status, when(col(valueColumn) === 0, "good").otherwise("bad"))
+ .drop(valueColumn)
+ } else input
}
def impl(sparkSession: SparkSession): (DataFrame, DataFrame)
@@ -65,13 +69,16 @@
def execute(sparkSession: SparkSession, batchId: Option[Long]): (DataFrame, DataFrame) = {
val (badRecordsDf, metricDf) = impl(sparkSession)
+ val processedRecordDf = preProcessRecords(badRecordsDf)
+ val processedMetricDf = preProcessMetrics(metricDf)
+
var batchDetailsOpt = StringUtils.EMPTY
val res = batchId match {
case Some(batchId) =>
implicit val bId: Long = batchId
batchDetailsOpt = s"for batch id $bId"
- (appendBatchIdIfAvailable(badRecordsDf), appendBatchIdIfAvailable(metricDf))
- case None => (badRecordsDf, metricDf)
+ (appendBatchIdIfAvailable(processedRecordDf), appendBatchIdIfAvailable(processedMetricDf))
+ case None => (processedRecordDf, processedMetricDf)
}
info(
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index 7f6eed7..115cc1b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -80,6 +80,7 @@
private def createMeasure(measureParam: MeasureParam): Measure = {
measureParam.getType match {
case MeasureTypes.Completeness => CompletenessMeasure(measureParam)
+ case MeasureTypes.Profiling => ProfilingMeasure(measureParam)
case _ =>
val errorMsg = s"Measure type '${measureParam.getType}' is not supported."
val exception = new NotImplementedError(errorMsg)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
index 1f201f4..b1178f8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasure.scala
@@ -49,19 +49,16 @@
}
val selectCols = Seq(Total, Complete, InComplete).flatMap(e => Seq(lit(e), col(e)))
- val metricColumn: Column = map(selectCols: _*).as(measureViolationsColName)
+ val metricColumn: Column = map(selectCols: _*).as(valueColumn)
val input = sparkSession.read.table(measureParam.getDataSource)
- val resultDf = input.withColumn(measureViolationsColName, column)
+ val badRecordsDf = input.withColumn(valueColumn, column)
- val metricDf = preProcessMetrics(
- resultDf
- .withColumn(Total, lit(1))
- .agg(sum(Total).as(Total), sum(measureViolationsColName).as(InComplete))
- .withColumn(Complete, col(Total) - col(InComplete))
- .select(metricColumn))
-
- val badRecordsDf = preProcessRecords(resultDf)
+ val metricDf = badRecordsDf
+ .withColumn(Total, lit(1))
+ .agg(sum(Total).as(Total), sum(valueColumn).as(InComplete))
+ .withColumn(Complete, col(Total) - col(InComplete))
+ .select(metricColumn)
(badRecordsDf, metricDf)
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
new file mode 100644
index 0000000..ef6ed73
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasure.scala
@@ -0,0 +1,152 @@
+package org.apache.griffin.measure.execution.impl
+
+import java.util.Locale
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+
+import org.apache.griffin.measure.configuration.dqdefinition.MeasureParam
+import org.apache.griffin.measure.execution.Measure
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+case class ProfilingMeasure(measureParam: MeasureParam) extends Measure {
+
+ import Measure._
+ import ProfilingMeasure._
+
+ override val supportsRecordWrite: Boolean = false
+
+ override val supportsMetricWrite: Boolean = true
+
+ private val roundScale: Int = getFromConfig[java.lang.Integer](RoundScaleStr, 3)
+ private val approxDistinctCount: Boolean =
+ getFromConfig[java.lang.Boolean](ApproxDistinctCountStr, true)
+
+ override def impl(sparkSession: SparkSession): (DataFrame, DataFrame) = {
+ val input = sparkSession.read.table(measureParam.getDataSource)
+ val profilingColNames =
+ getFromConfig[Seq[String]](Expression, input.columns)
+ .map(_.trim.toLowerCase(Locale.ROOT))
+ .toSet
+
+ assert(
+ profilingColNames.nonEmpty,
+ s"Invalid columns [${profilingColNames.mkString(", ")}] were provided for profiling")
+
+ val profilingCols =
+ input.schema.fields.filter(f =>
+ profilingColNames.contains(f.name) && !f.name.equalsIgnoreCase(ConstantColumns.tmst))
+
+ val profilingExprs = profilingCols.foldLeft(Array.empty[Column])((exprList, field) => {
+ val colName = field.name
+ val profilingExprs = getProfilingExprs(field, roundScale, approxDistinctCount)
+
+ exprList.:+(
+ map(lit(colName).as(ColName), struct(profilingExprs: _*))
+ .as(s"$DetailsPrefix$colName"))
+ })
+
+ val aggregateDf = profilingCols
+ .foldLeft(input)((df, field) => {
+ val colName = field.name
+ val column = col(colName)
+
+ val lengthColName = lengthColFn(colName)
+ val nullColName = nullsInColFn(colName)
+
+ df.withColumn(lengthColName, length(column))
+ .withColumn(nullColName, when(isnull(column).or(isnan(column)), 1L).otherwise(0L))
+ })
+ .agg(count(lit(1L)).as(TotalCount), profilingExprs: _*)
+
+ val detailCols = aggregateDf.columns.filter(_.startsWith(DetailsPrefix)).map(col)
+
+ val metricDf = aggregateDf
+ .withColumn(ColumnDetails, array(detailCols: _*))
+ .select(TotalCount, ColumnDetails)
+ .select(to_json(struct(AllColumns)).as(valueColumn))
+
+ (sparkSession.emptyDataFrame, metricDf)
+ }
+
+}
+
+object ProfilingMeasure {
+
+ /**
+ * Options Keys
+ */
+ private final val RoundScaleStr: String = "round.scale"
+ private final val ApproxDistinctCountStr: String = "approx.distinct.count"
+
+ /**
+ * Structure Keys
+ */
+ private final val ColumnDetails: String = "column_details"
+ private final val ColName: String = "col_name"
+ private final val DataTypeStr: String = "data_type"
+ private final val TotalCount: String = "total_count"
+
+ /**
+ * Prefix Keys
+ */
+ private final val ApproxPrefix: String = "approx_"
+ private final val DetailsPrefix: String = "details_"
+ private final val ColumnLengthPrefix: String = "col_len"
+ private final val IsNullPrefix: String = "is_null"
+
+ /**
+ * Column Detail Keys
+ */
+ private final val NullCount: String = "null_count"
+ private final val DistinctCount: String = "distinct_count"
+ private final val Min: String = "min"
+ private final val Max: String = "max"
+ private final val Avg: String = "avg"
+ private final val StdDeviation: String = "std_dev"
+ private final val Variance: String = "variance"
+ private final val Kurtosis: String = "kurtosis"
+ private final val MinColLength: String = s"${Min}_$ColumnLengthPrefix"
+ private final val MaxColLength: String = s"${Max}_$ColumnLengthPrefix"
+ private final val AvgColLength: String = s"${Avg}_$ColumnLengthPrefix"
+
+ private final val AllColumns: String = "*"
+
+ private def lengthColFn(colName: String): String = s"${ColumnLengthPrefix}_$colName"
+
+ private def nullsInColFn(colName: String): String = s"${IsNullPrefix}_$colName"
+
+ private def forNumericFn(t: DataType, value: Column, alias: String): Column = {
+ (if (t.isInstanceOf[NumericType]) value else lit(null)).as(alias)
+ }
+
+ private def getProfilingExprs(
+ field: StructField,
+ roundScale: Int,
+ approxDistinctCount: Boolean): Seq[Column] = {
+ val colName = field.name
+ val colType = field.dataType
+
+ val column = col(colName)
+ val lengthColExpr = col(lengthColFn(colName))
+ val nullColExpr = col(nullsInColFn(colName))
+ val distinctCountExpr =
+ if (approxDistinctCount) approx_count_distinct(column).as(s"$ApproxPrefix$DistinctCount")
+ else countDistinct(column).as(DistinctCount)
+
+ Seq(
+ lit(colType.catalogString).as(DataTypeStr),
+ min(lengthColExpr).as(MinColLength),
+ max(lengthColExpr).as(MaxColLength),
+ forNumericFn(colType, avg(lengthColExpr), AvgColLength),
+ forNumericFn(colType, min(column), Min),
+ forNumericFn(colType, max(column), Max),
+ forNumericFn(colType, bround(avg(column), roundScale), Avg),
+ forNumericFn(colType, bround(stddev(column), roundScale), StdDeviation),
+ forNumericFn(colType, bround(variance(column), roundScale), Variance),
+ forNumericFn(colType, bround(kurtosis(column), roundScale), Kurtosis),
+ distinctCountExpr,
+ sum(nullColExpr).as(NullCount))
+ }
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 8459e88..936d329 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -45,13 +45,13 @@
def validate(): Boolean = true
override def open(applicationId: String): Unit = {
- griffinLogger.info(
+ info(
s"Opened ConsoleSink for job with name '$jobName', " +
s"timestamp '$timeStamp' and applicationId '$applicationId'")
}
override def close(): Unit = {
- griffinLogger.info(
+ info(
s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
}
@@ -60,7 +60,7 @@
override def sinkRecords(records: Iterable[String], name: String): Unit = {}
override def sinkMetrics(metrics: Map[String, Any]): Unit = {
- griffinLogger.info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
+ info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
}
override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 980bded..f6802ec 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -31,7 +31,7 @@
val writeTimestampOpt: Option[Long] = None
def execute(context: DQContext): Try[Boolean] = Try {
- context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
+ val res = context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
val (t, metric) = pair
val pr =
try {
@@ -50,6 +50,9 @@
}
ret && pr
}
+
+ context.metricWrapper.clear()
+ res
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
index db6bb63..43cfcee 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
@@ -21,20 +21,22 @@
import scala.reflect._
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.commons.lang3.StringEscapeUtils
object JsonUtil {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ mapper.configure(SerializationFeature.INDENT_OUTPUT, true)
def toJson(value: Map[Symbol, Any]): String = {
toJson(value map { case (k, v) => k.name -> v })
}
def toJson(value: Any): String = {
- mapper.writeValueAsString(value)
+ StringEscapeUtils.unescapeJava(mapper.writeValueAsString(value))
}
def fromJson[T: ClassTag](json: String): T = {