Merge pull request #527 from joohnnie/GRIFFIN-280
GRIFFIN-280 update travis config to start griffin docker container
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
index 796c797..743b05d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
@@ -29,7 +29,7 @@
val name = getStepName(ruleParam.getOutDfName())
val inputDfName = getStepName(ruleParam.getInDfName())
val transformStep = DataFrameOpsTransformStep(
- name, inputDfName, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+ name, inputDfName, ruleParam.getRule, ruleParam.getDetails, None, ruleParam.getCache)
transformStep +: buildDirectWriteSteps(ruleParam)
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
index b5dfd0c..0fdf20a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
@@ -28,7 +28,7 @@
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
val name = getStepName(ruleParam.getOutDfName())
val transformStep = SparkSqlTransformStep(
- name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+ name, ruleParam.getRule, ruleParam.getDetails, None, ruleParam.getCache)
transformStep +: buildDirectWriteSteps(ruleParam)
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 31eef69..3bb5737 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -81,25 +81,22 @@
s"SELECT ${selClause} FROM `${sourceName}` " +
s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
}
- val missRecordsTransStep =
- SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
val missRecordsWriteSteps = procType match {
case BatchProcessType =>
val rwName =
ruleParam.getOutputOpt(RecordOutputType).
flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
- RecordWriteStep(rwName, missRecordsTableName) :: Nil
- case StreamingProcessType => Nil
- }
- val missRecordsUpdateWriteSteps = procType match {
- case BatchProcessType => Nil
+ RecordWriteStep(rwName, missRecordsTableName)
case StreamingProcessType =>
val dsName =
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
- DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
+ DataSourceUpdateWriteStep(dsName, missRecordsTableName)
}
+ val missRecordsTransStep =
+ SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, Some(missRecordsWriteSteps), true)
+
// 2. miss count
val missCountTableName = "__missCount"
val missColName = details.getStringOrKey(_miss)
@@ -151,23 +148,23 @@
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
}
- val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
- accuracyTransStep.parentSteps += missCountTransStep
- accuracyTransStep.parentSteps += totalCountTransStep
- val accuracyMetricWriteSteps = procType match {
+
+ val accuracyMetricWriteStep = procType match {
case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
- MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
- case StreamingProcessType => Nil
+ Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
+ case StreamingProcessType => None
}
- val batchWriteSteps =
- accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
+ val accuracyTransStep =
+ SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap, accuracyMetricWriteStep)
+ accuracyTransStep.parentSteps += missCountTransStep
+ accuracyTransStep.parentSteps += totalCountTransStep
procType match {
- case BatchProcessType => accuracyTransStep :: batchWriteSteps
+ case BatchProcessType => accuracyTransStep :: Nil
// streaming extra steps
case StreamingProcessType =>
// 5. accuracy metric merge
@@ -178,15 +175,16 @@
(AccuracyOprKeys._total -> totalColName),
(AccuracyOprKeys._matched -> matchedColName)
)
- val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
- accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
- accuracyMetricTransStep.parentSteps += accuracyTransStep
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
+ val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
+ accuracyTableName, accuracyMetricRule, accuracyMetricDetails, Some(accuracyMetricWriteStep))
+ accuracyMetricTransStep.parentSteps += accuracyTransStep
+
// 6. collect accuracy records
val accuracyRecordTableName = "__accuracyRecords"
@@ -196,9 +194,7 @@
|FROM `${accuracyMetricTableName}` WHERE `${ConstantColumns.record}`
""".stripMargin
}
- val accuracyRecordTransStep = SparkSqlTransformStep(
- accuracyRecordTableName, accuracyRecordSql, emptyMap)
- accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
+
val accuracyRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -206,10 +202,11 @@
RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
}
+ val accuracyRecordTransStep = SparkSqlTransformStep(
+ accuracyRecordTableName, accuracyRecordSql, emptyMap, Some(accuracyRecordWriteStep))
+ accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
- // extra steps
- val streamingWriteSteps = accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil
- accuracyRecordTransStep :: batchWriteSteps ++ streamingWriteSteps
+ accuracyRecordTransStep :: Nil
}
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 3df4a12..7312f29 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -81,7 +81,7 @@
s"SELECT ${selClause} FROM `${sourceName}`"
}
val sourceAliasTransStep =
- SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+ SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, None, true)
// 2. incomplete record
val incompleteRecordsTableName = "__incompleteRecords"
@@ -91,15 +91,17 @@
val incompleteRecordsSql =
s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
- val incompleteRecordTransStep =
- SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
- incompleteRecordTransStep.parentSteps += sourceAliasTransStep
val incompleteRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(incompleteRecordsTableName)
RecordWriteStep(rwName, incompleteRecordsTableName)
}
+ val incompleteRecordTransStep =
+ SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap,
+ Some(incompleteRecordWriteStep), true)
+ incompleteRecordTransStep.parentSteps += sourceAliasTransStep
+
// 3. incomplete count
val incompleteCountTableName = "__incompleteCount"
@@ -149,21 +151,19 @@
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${incompleteCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
}
- val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
- completeTransStep.parentSteps += incompleteCountTransStep
- completeTransStep.parentSteps += totalCountTransStep
val completeWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, completeTableName, flattenType)
}
+ val completeTransStep =
+ SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap, Some(completeWriteStep))
+ completeTransStep.parentSteps += incompleteCountTransStep
+ completeTransStep.parentSteps += totalCountTransStep
val transSteps = completeTransStep :: Nil
- val writeSteps = incompleteRecordWriteStep :: completeWriteStep :: Nil
-
- // full steps
- transSteps ++ writeSteps
+ transSteps
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 0e2b10e..65460c3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -102,7 +102,7 @@
s"SELECT ${selClause} FROM `${sourceName}`"
}
val sourceAliasTransStep =
- SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+ SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, None, true)
// 2. total metric
val totalTableName = "__totalMetric"
@@ -110,11 +110,12 @@
val totalSql = {
s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
}
- val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
- totalTransStep.parentSteps += sourceAliasTransStep
val totalMetricWriteStep = {
MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
}
+ val totalTransStep =
+ SparkSqlTransformStep(totalTableName, totalSql, emptyMap, Some(totalMetricWriteStep))
+ totalTransStep.parentSteps += sourceAliasTransStep
// 3. group by self
val selfGroupTableName = "__selfGroup"
@@ -128,13 +129,12 @@
""".stripMargin
}
val selfGroupTransStep =
- SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+ SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, None, true)
selfGroupTransStep.parentSteps += sourceAliasTransStep
val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
- val writeSteps1 = totalMetricWriteStep :: Nil
- val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
+ val (transSteps2, dupCountTableName) = procType match {
case StreamingProcessType if (withOlderTable) =>
// 4.0 update old data
val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
@@ -208,14 +208,12 @@
""".stripMargin
}
val finalDupCountTransStep =
- SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+ SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, None, true)
finalDupCountTransStep.parentSteps += groupTransStep
- ((finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil),
- finalDupCountTableName)
+ (finalDupCountTransStep :: targetDsUpdateWriteStep :: Nil, finalDupCountTableName)
case _ =>
- ((selfGroupTransStep :: Nil, totalMetricWriteStep :: Nil),
- selfGroupTableName)
+ (selfGroupTransStep :: Nil, selfGroupTableName)
}
// 8. distinct metric
@@ -227,16 +225,16 @@
|FROM `${dupCountTableName}` WHERE `${ConstantColumns.distinct}`
""".stripMargin
}
- val distTransStep = SparkSqlTransformStep(distTableName, distSql, emptyMap)
val distMetricWriteStep = {
MetricWriteStep(distColName, distTableName, EntriesFlattenType, writeTimestampOpt)
}
+ val distTransStep =
+ SparkSqlTransformStep(distTableName, distSql, emptyMap, Some(distMetricWriteStep))
val transSteps3 = distTransStep :: Nil
- val writeSteps3 = distMetricWriteStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
- val (transSteps4, writeSteps4) = if (duplicationArrayName.nonEmpty) {
+ val transSteps4 = if (duplicationArrayName.nonEmpty) {
val recordEnable = details.getBoolean(_recordEnable, false)
if (groupAliases.size > 0) {
// with some group by requirement
@@ -278,12 +276,23 @@
|WHERE NOT `${ConstantColumns.distinct}` OR `${ConstantColumns.rowNumber}` > 1
""".stripMargin
}
- val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
- dupItemsTransStep.parentSteps += rnTransStep
val dupItemsWriteStep = {
val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
}
+ val dupItemsTransStep = {
+ if (recordEnable) {
+ SparkSqlTransformStep(
+ dupItemsTableName,
+ dupItemsSql,
+ emptyMap,
+ Some(dupItemsWriteStep)
+ )
+ } else {
+ SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
+ }
+ }
+ dupItemsTransStep.parentSteps += rnTransStep
// 12. group by dup Record metric
val groupDupMetricTableName = "__groupDupMetric"
@@ -295,25 +304,22 @@
|FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}`
""".stripMargin
}
- val groupDupMetricTransStep =
- SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
- groupDupMetricTransStep.parentSteps += dupItemsTransStep
val groupDupMetricWriteStep = {
MetricWriteStep(duplicationArrayName,
groupDupMetricTableName,
ArrayFlattenType,
writeTimestampOpt)
}
+ val groupDupMetricTransStep =
+ SparkSqlTransformStep(
+ groupDupMetricTableName,
+ groupDupMetricSql,
+ emptyMap,
+ Some(groupDupMetricWriteStep)
+ )
+ groupDupMetricTransStep.parentSteps += dupItemsTransStep
- val msteps = groupDupMetricTransStep :: Nil
- val wsteps = if (recordEnable) {
- dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
- } else {
- groupDupMetricWriteStep :: Nil
- }
-
- (msteps, wsteps)
-
+ groupDupMetricTransStep :: Nil
} else {
// no group by requirement
// 9. duplicate record
@@ -330,16 +336,25 @@
|FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
""".stripMargin
}
- val dupRecordTransStep =
- SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
-
val dupRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
.getOrElse(dupRecordTableName)
-
RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
}
+ val dupRecordTransStep = {
+ if (recordEnable) {
+ SparkSqlTransformStep(
+ dupRecordTableName,
+ dupRecordSql,
+ emptyMap,
+ Some(dupRecordWriteStep),
+ true
+ )
+ } else {
+ SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, None, true)
+ }
+ }
// 10. duplicate metric
val dupMetricTableName = "__dupMetric"
@@ -350,8 +365,6 @@
|FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
""".stripMargin
}
- val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
- dupMetricTransStep.parentSteps += dupRecordTransStep
val dupMetricWriteStep = {
MetricWriteStep(
duplicationArrayName,
@@ -360,22 +373,21 @@
writeTimestampOpt
)
}
+ val dupMetricTransStep =
+ SparkSqlTransformStep(
+ dupMetricTableName,
+ dupMetricSql,
+ emptyMap,
+ Some(dupMetricWriteStep)
+ )
+ dupMetricTransStep.parentSteps += dupRecordTransStep
- val msteps = dupMetricTransStep :: Nil
- val wsteps = if (recordEnable) {
- dupRecordWriteStep :: dupMetricWriteStep :: Nil
- } else {
- dupMetricWriteStep :: Nil
- }
-
- (msteps, wsteps)
+ dupMetricTransStep :: Nil
}
- } else (Nil, Nil)
+ } else Nil
// full steps
- transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
- writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
-
+ transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index 492f4fd..e9a65b4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -21,9 +21,10 @@
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.configuration.enums._
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.dsl.expr.Expr
+import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep, WriteStep}
trait Expr2DQSteps extends Loggable with Serializable {
@@ -31,7 +32,6 @@
protected val emptyMap = Map[String, Any]()
def getDQSteps(): Seq[DQStep]
-
}
/**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index af493af..68ca2f4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -97,14 +97,15 @@
s"${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
}
val profilingName = ruleParam.getOutDfName()
- val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
val profilingMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, profilingName, flattenType)
}
- profilingTransStep :: profilingMetricWriteStep :: Nil
+ val profilingTransStep =
+ SparkSqlTransformStep(profilingName, profilingSql, details, Some(profilingMetricWriteStep))
+ profilingTransStep :: Nil
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 03c2c8d..5a3acfb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -104,7 +104,7 @@
s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` " +
s"FROM `${inTimeTableName}`"
}
- val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
+ val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, None, true)
latencyTransStep.parentSteps += inTimeTransStep
// 3. timeliness metric
@@ -129,27 +129,26 @@
|GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
- val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
- metricTransStep.parentSteps += latencyTransStep
val metricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, metricTableName, flattenType)
}
+ val metricTransStep =
+ SparkSqlTransformStep(metricTableName, metricSql, emptyMap, Some(metricWriteStep))
+ metricTransStep.parentSteps += latencyTransStep
// current steps
val transSteps1 = metricTransStep :: Nil
- val writeSteps1 = metricWriteStep :: Nil
// 4. timeliness record
- val (transSteps2, writeSteps2) = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
+ val transSteps2 = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
case Some(tsh) =>
val recordTableName = "__lateRecords"
val recordSql = {
s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}"
}
- val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
val recordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -157,12 +156,16 @@
RecordWriteStep(rwName, recordTableName, None)
}
- (recordTransStep :: Nil, recordWriteStep :: Nil)
- case _ => (Nil, Nil)
+ val recordTransStep =
+ SparkSqlTransformStep(recordTableName, recordSql, emptyMap, Some(recordWriteStep))
+ recordTransStep.parentSteps += latencyTransStep
+
+ recordTransStep :: Nil
+ case _ => Nil
}
// 5. ranges
- val (transSteps3, writeSteps3) = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
+ val transSteps3 = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
case Some(stepSize) =>
// 5.1 range
val rangeTableName = "__range"
@@ -174,6 +177,7 @@
""".stripMargin
}
val rangeTransStep = SparkSqlTransformStep(rangeTableName, rangeSql, emptyMap)
+ rangeTransStep.parentSteps += latencyTransStep
// 5.2 range metric
val rangeMetricTableName = "__rangeMetric"
@@ -190,20 +194,20 @@
|FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, `${stepColName}`
""".stripMargin
}
- val rangeMetricTransStep =
- SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
- rangeMetricTransStep.parentSteps += rangeTransStep
val rangeMetricWriteStep = {
MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
}
+ val rangeMetricTransStep =
+ SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap, Some(rangeMetricWriteStep))
+ rangeMetricTransStep.parentSteps += rangeTransStep
- (rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
- case _ => (Nil, Nil)
+ rangeMetricTransStep :: Nil
+ case _ => Nil
}
// 6. percentiles
val percentiles = getPercentiles(details)
- val (transSteps4, writeSteps4) = if (percentiles.size > 0) {
+ val transSteps4 = if (percentiles.size > 0) {
val percentileTableName = "__percentile"
val percentileColName = details.getStringOrKey(_percentileColPrefix)
val percentileCols = percentiles.map { pct =>
@@ -217,19 +221,18 @@
|FROM `${latencyTableName}`
""".stripMargin
}
- val percentileTransStep =
- SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
-
val percentileWriteStep = {
MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
}
+ val percentileTransStep =
+ SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap, Some(percentileWriteStep))
+ percentileTransStep.parentSteps += latencyTransStep
- (percentileTransStep :: Nil, percentileWriteStep :: Nil)
- } else (Nil, Nil)
+ percentileTransStep :: Nil
+ } else Nil
// full steps
- transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
- writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
+ transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 7f259ea..a19b35c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -117,7 +117,7 @@
s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
}
- val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
+ val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, None, true)
groupTransStep.parentSteps += joinedTransStep
// 5. total metric
@@ -131,8 +131,9 @@
|FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
- val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
+ val totalTransStep =
+ SparkSqlTransformStep(totalTableName, totalSql, emptyMap, Some(totalMetricWriteStep))
// 6. unique record
val uniqueRecordTableName = "__uniqueRecord"
@@ -155,24 +156,21 @@
|FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
}
- val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
- uniqueTransStep.parentSteps += uniqueRecordTransStep
-
val uniqueMetricWriteStep =
MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
+ val uniqueTransStep =
+ SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap, Some(uniqueMetricWriteStep))
+ uniqueTransStep.parentSteps += uniqueRecordTransStep
val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
- val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
- val (transSteps2, writeSteps2) = if (duplicationArrayName.nonEmpty) {
+ val transSteps2 = if (duplicationArrayName.nonEmpty) {
// 8. duplicate record
val dupRecordTableName = "__dupRecords"
val dupRecordSql = {
s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
}
- val dupRecordTransStep =
- SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
val dupRecordWriteStep = {
val rwName =
@@ -181,6 +179,8 @@
RecordWriteStep(rwName, dupRecordTableName)
}
+ val dupRecordTransStep =
+ SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, Some(dupRecordWriteStep), true)
// 9. duplicate metric
val dupMetricTableName = "__dupMetric"
@@ -201,18 +201,22 @@
|GROUP BY ${dupMetricGroupbyClause}
""".stripMargin
}
- val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
- dupMetricTransStep.parentSteps += dupRecordTransStep
val dupMetricWriteStep = {
MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
}
+ val dupMetricTransStep =
+ SparkSqlTransformStep(dupMetricTableName,
+ dupMetricSql,
+ emptyMap,
+ Some(dupMetricWriteStep)
+ )
+ dupMetricTransStep.parentSteps += dupRecordTransStep
- (dupMetricTransStep :: Nil,
- dupRecordWriteStep :: dupMetricWriteStep :: Nil)
- } else (Nil, Nil)
+ dupMetricTransStep :: Nil
+ } else Nil
// full steps
- transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
+ transSteps1 ++ transSteps2
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index b07595a..c393706 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -19,14 +19,16 @@
package org.apache.griffin.measure.step.transform
import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.WriteStep
/**
* data frame ops transform step
*/
-case class DataFrameOpsTransformStep(name: String,
+case class DataFrameOpsTransformStep[T <: WriteStep](name: String,
inputDfName: String,
rule: String,
details: Map[String, Any],
+ writeStepOpt: Option[T] = None,
cache: Boolean = false
) extends TransformStep {
@@ -43,7 +45,10 @@
}
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
- true
+ writeStepOpt match {
+ case Some(writeStep) => writeStep.execute(context)
+ case None => true
+ }
} catch {
case e: Throwable =>
error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}", e)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 59ea822..00edf07 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -19,23 +19,27 @@
package org.apache.griffin.measure.step.transform
import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.WriteStep
/**
* spark sql transform step
*/
-case class SparkSqlTransformStep(name: String,
- rule: String,
- details: Map[String, Any],
- cache: Boolean = false
- ) extends TransformStep {
-
+case class SparkSqlTransformStep[T <: WriteStep](name: String,
+ rule: String,
+ details: Map[String, Any],
+ writeStepOpt: Option[T] = None,
+ cache: Boolean = false
+ ) extends TransformStep {
def doExecute(context: DQContext): Boolean = {
val sqlContext = context.sqlContext
try {
val df = sqlContext.sql(rule)
if (cache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
- true
+ writeStepOpt match {
+ case Some(writeStep) => writeStep.execute(context)
+ case None => true
+ }
} catch {
case e: Throwable =>
error(s"run spark sql [ ${rule} ] error: ${e.getMessage}", e)
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index e640c45..5314669 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -19,15 +19,14 @@
package org.apache.griffin.measure.step
import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.scalatest._
+import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.context.ContextId
import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.step.transform.TransformStep
-import org.scalatest._
-
class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
case class DualTransformStep(name: String,
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
index 3107852..1a9c452 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
@@ -27,6 +27,7 @@
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
import javax.persistence.OneToMany;
+import javax.persistence.OrderBy;
@Entity
@@ -36,6 +37,7 @@
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "evaluate_rule_id")
+ @OrderBy("id ASC")
private List<Rule> rules = new ArrayList<>();
public List<Rule> getRules() {