Merge pull request #527 from joohnnie/GRIFFIN-280

GRIFFIN-280 update travis config to start griffin docker container
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
index 796c797..743b05d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
@@ -29,7 +29,7 @@
     val name = getStepName(ruleParam.getOutDfName())
     val inputDfName = getStepName(ruleParam.getInDfName())
     val transformStep = DataFrameOpsTransformStep(
-      name, inputDfName, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+      name, inputDfName, ruleParam.getRule, ruleParam.getDetails, None, ruleParam.getCache)
     transformStep +: buildDirectWriteSteps(ruleParam)
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
index b5dfd0c..0fdf20a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
@@ -28,7 +28,7 @@
   def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
     val name = getStepName(ruleParam.getOutDfName())
     val transformStep = SparkSqlTransformStep(
-      name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+      name, ruleParam.getRule, ruleParam.getDetails, None, ruleParam.getCache)
     transformStep +: buildDirectWriteSteps(ruleParam)
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 31eef69..3bb5737 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -81,25 +81,22 @@
         s"SELECT ${selClause} FROM `${sourceName}` " +
           s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
       }
-      val missRecordsTransStep =
-        SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
 
       val missRecordsWriteSteps = procType match {
         case BatchProcessType =>
           val rwName =
             ruleParam.getOutputOpt(RecordOutputType).
               flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
-          RecordWriteStep(rwName, missRecordsTableName) :: Nil
-        case StreamingProcessType => Nil
-      }
-      val missRecordsUpdateWriteSteps = procType match {
-        case BatchProcessType => Nil
+          RecordWriteStep(rwName, missRecordsTableName)
         case StreamingProcessType =>
           val dsName =
             ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
-          DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
+          DataSourceUpdateWriteStep(dsName, missRecordsTableName)
       }
 
+      val missRecordsTransStep =
+        SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, Some(missRecordsWriteSteps), true)
+
       // 2. miss count
       val missCountTableName = "__missCount"
       val missColName = details.getStringOrKey(_miss)
@@ -151,23 +148,23 @@
              |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}`
          """.stripMargin
       }
-      val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
-      accuracyTransStep.parentSteps += missCountTransStep
-      accuracyTransStep.parentSteps += totalCountTransStep
-      val accuracyMetricWriteSteps = procType match {
+
+      val accuracyMetricWriteStep = procType match {
         case BatchProcessType =>
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
           val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
           val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
-          MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
-        case StreamingProcessType => Nil
+          Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
+        case StreamingProcessType => None
       }
 
-      val batchWriteSteps =
-        accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
+      val accuracyTransStep =
+        SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap, accuracyMetricWriteStep)
+      accuracyTransStep.parentSteps += missCountTransStep
+      accuracyTransStep.parentSteps += totalCountTransStep
 
       procType match {
-        case BatchProcessType => accuracyTransStep :: batchWriteSteps
+        case BatchProcessType => accuracyTransStep :: Nil
         // streaming extra steps
         case StreamingProcessType =>
           // 5. accuracy metric merge
@@ -178,15 +175,16 @@
             (AccuracyOprKeys._total -> totalColName),
             (AccuracyOprKeys._matched -> matchedColName)
           )
-          val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
-            accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
-          accuracyMetricTransStep.parentSteps += accuracyTransStep
           val accuracyMetricWriteStep = {
             val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
             val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
             val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
             MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
           }
+          val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
+            accuracyTableName, accuracyMetricRule, accuracyMetricDetails, Some(accuracyMetricWriteStep))
+          accuracyMetricTransStep.parentSteps += accuracyTransStep
+
 
           // 6. collect accuracy records
           val accuracyRecordTableName = "__accuracyRecords"
@@ -196,9 +194,7 @@
                |FROM `${accuracyMetricTableName}` WHERE `${ConstantColumns.record}`
              """.stripMargin
           }
-          val accuracyRecordTransStep = SparkSqlTransformStep(
-            accuracyRecordTableName, accuracyRecordSql, emptyMap)
-          accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
+
           val accuracyRecordWriteStep = {
             val rwName =
               ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -206,10 +202,11 @@
 
             RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
           }
+          val accuracyRecordTransStep = SparkSqlTransformStep(
+            accuracyRecordTableName, accuracyRecordSql, emptyMap, Some(accuracyRecordWriteStep))
+          accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
 
-          // extra steps
-          val streamingWriteSteps = accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil
-          accuracyRecordTransStep :: batchWriteSteps ++ streamingWriteSteps
+          accuracyRecordTransStep :: Nil
       }
     }
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 3df4a12..7312f29 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -81,7 +81,7 @@
         s"SELECT ${selClause} FROM `${sourceName}`"
       }
       val sourceAliasTransStep =
-        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, None, true)
 
       // 2. incomplete record
       val incompleteRecordsTableName = "__incompleteRecords"
@@ -91,15 +91,17 @@
       val incompleteRecordsSql =
         s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
 
-      val incompleteRecordTransStep =
-        SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
-      incompleteRecordTransStep.parentSteps += sourceAliasTransStep
       val incompleteRecordWriteStep = {
         val rwName =
           ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
             .getOrElse(incompleteRecordsTableName)
         RecordWriteStep(rwName, incompleteRecordsTableName)
       }
+      val incompleteRecordTransStep =
+        SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap,
+          Some(incompleteRecordWriteStep), true)
+      incompleteRecordTransStep.parentSteps += sourceAliasTransStep
+
 
       // 3. incomplete count
       val incompleteCountTableName = "__incompleteCount"
@@ -149,21 +151,19 @@
              |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${incompleteCountTableName}`.`${ConstantColumns.tmst}`
          """.stripMargin
       }
-      val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
-      completeTransStep.parentSteps += incompleteCountTransStep
-      completeTransStep.parentSteps += totalCountTransStep
       val completeWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
         val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
         MetricWriteStep(mwName, completeTableName, flattenType)
       }
+      val completeTransStep =
+        SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap, Some(completeWriteStep))
+      completeTransStep.parentSteps += incompleteCountTransStep
+      completeTransStep.parentSteps += totalCountTransStep
 
       val transSteps = completeTransStep :: Nil
-      val writeSteps = incompleteRecordWriteStep :: completeWriteStep :: Nil
-
-      // full steps
-      transSteps ++ writeSteps
+      transSteps
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 0e2b10e..65460c3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -102,7 +102,7 @@
         s"SELECT ${selClause} FROM `${sourceName}`"
       }
       val sourceAliasTransStep =
-        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, None, true)
 
       // 2. total metric
       val totalTableName = "__totalMetric"
@@ -110,11 +110,12 @@
       val totalSql = {
         s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
       }
-      val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
-      totalTransStep.parentSteps += sourceAliasTransStep
       val totalMetricWriteStep = {
         MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
       }
+      val totalTransStep =
+        SparkSqlTransformStep(totalTableName, totalSql, emptyMap, Some(totalMetricWriteStep))
+      totalTransStep.parentSteps += sourceAliasTransStep
 
       // 3. group by self
       val selfGroupTableName = "__selfGroup"
@@ -128,13 +129,12 @@
           """.stripMargin
       }
       val selfGroupTransStep =
-        SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+        SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, None, true)
       selfGroupTransStep.parentSteps += sourceAliasTransStep
 
       val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
-      val writeSteps1 = totalMetricWriteStep :: Nil
 
-      val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
+      val (transSteps2, dupCountTableName) = procType match {
         case StreamingProcessType if (withOlderTable) =>
           // 4.0 update old data
           val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
@@ -208,14 +208,12 @@
              """.stripMargin
           }
           val finalDupCountTransStep =
-            SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+            SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, None, true)
           finalDupCountTransStep.parentSteps += groupTransStep
 
-          ((finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil),
-            finalDupCountTableName)
+          (finalDupCountTransStep :: targetDsUpdateWriteStep :: Nil, finalDupCountTableName)
         case _ =>
-          ((selfGroupTransStep :: Nil, totalMetricWriteStep :: Nil),
-            selfGroupTableName)
+          (selfGroupTransStep :: Nil, selfGroupTableName)
       }
 
       // 8. distinct metric
@@ -227,16 +225,16 @@
            |FROM `${dupCountTableName}` WHERE `${ConstantColumns.distinct}`
          """.stripMargin
       }
-      val distTransStep = SparkSqlTransformStep(distTableName, distSql, emptyMap)
       val distMetricWriteStep = {
         MetricWriteStep(distColName, distTableName, EntriesFlattenType, writeTimestampOpt)
       }
+      val distTransStep =
+        SparkSqlTransformStep(distTableName, distSql, emptyMap, Some(distMetricWriteStep))
 
       val transSteps3 = distTransStep :: Nil
-      val writeSteps3 = distMetricWriteStep :: Nil
 
       val duplicationArrayName = details.getString(_duplicationArray, "")
-      val (transSteps4, writeSteps4) = if (duplicationArrayName.nonEmpty) {
+      val transSteps4 = if (duplicationArrayName.nonEmpty) {
         val recordEnable = details.getBoolean(_recordEnable, false)
         if (groupAliases.size > 0) {
           // with some group by requirement
@@ -278,12 +276,23 @@
                |WHERE NOT `${ConstantColumns.distinct}` OR `${ConstantColumns.rowNumber}` > 1
                """.stripMargin
           }
-          val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
-          dupItemsTransStep.parentSteps += rnTransStep
           val dupItemsWriteStep = {
             val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
             RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
           }
+          val dupItemsTransStep = {
+            if (recordEnable) {
+              SparkSqlTransformStep(
+                dupItemsTableName,
+                dupItemsSql,
+                emptyMap,
+                Some(dupItemsWriteStep)
+              )
+            } else {
+              SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
+            }
+          }
+          dupItemsTransStep.parentSteps += rnTransStep
 
           // 12. group by dup Record metric
           val groupDupMetricTableName = "__groupDupMetric"
@@ -295,25 +304,22 @@
                |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}`
              """.stripMargin
           }
-          val groupDupMetricTransStep =
-            SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
-          groupDupMetricTransStep.parentSteps += dupItemsTransStep
           val groupDupMetricWriteStep = {
             MetricWriteStep(duplicationArrayName,
               groupDupMetricTableName,
               ArrayFlattenType,
               writeTimestampOpt)
           }
+          val groupDupMetricTransStep =
+            SparkSqlTransformStep(
+              groupDupMetricTableName,
+              groupDupMetricSql,
+              emptyMap,
+              Some(groupDupMetricWriteStep)
+            )
+          groupDupMetricTransStep.parentSteps += dupItemsTransStep
 
-          val msteps = groupDupMetricTransStep :: Nil
-          val wsteps = if (recordEnable) {
-            dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
-          } else {
-            groupDupMetricWriteStep :: Nil
-          }
-
-          (msteps, wsteps)
-
+          groupDupMetricTransStep :: Nil
         } else {
           // no group by requirement
           // 9. duplicate record
@@ -330,16 +336,25 @@
                |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
               """.stripMargin
           }
-          val dupRecordTransStep =
-            SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
-
           val dupRecordWriteStep = {
             val rwName =
               ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
                 .getOrElse(dupRecordTableName)
-
             RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
           }
+          val dupRecordTransStep = {
+            if (recordEnable) {
+              SparkSqlTransformStep(
+                dupRecordTableName,
+                dupRecordSql,
+                emptyMap,
+                Some(dupRecordWriteStep),
+                true
+              )
+            } else {
+              SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, None, true)
+            }
+          }
 
           // 10. duplicate metric
           val dupMetricTableName = "__dupMetric"
@@ -350,8 +365,6 @@
                |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
               """.stripMargin
           }
-          val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
-          dupMetricTransStep.parentSteps += dupRecordTransStep
           val dupMetricWriteStep = {
             MetricWriteStep(
               duplicationArrayName,
@@ -360,22 +373,21 @@
               writeTimestampOpt
             )
           }
+          val dupMetricTransStep =
+            SparkSqlTransformStep(
+              dupMetricTableName,
+              dupMetricSql,
+              emptyMap,
+              Some(dupMetricWriteStep)
+            )
+          dupMetricTransStep.parentSteps += dupRecordTransStep
 
-          val msteps = dupMetricTransStep :: Nil
-          val wsteps = if (recordEnable) {
-            dupRecordWriteStep :: dupMetricWriteStep :: Nil
-          } else {
-            dupMetricWriteStep :: Nil
-          }
-
-          (msteps, wsteps)
+          dupMetricTransStep :: Nil
         }
-      } else (Nil, Nil)
+      } else Nil
 
       // full steps
-      transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
-        writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
-
+      transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index 492f4fd..e9a65b4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -21,9 +21,10 @@
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
 import org.apache.griffin.measure.configuration.enums._
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.dsl.expr.Expr
+import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep, WriteStep}
 
 trait Expr2DQSteps extends Loggable with Serializable {
 
@@ -31,7 +32,6 @@
   protected val emptyMap = Map[String, Any]()
 
   def getDQSteps(): Seq[DQStep]
-
 }
 
 /**
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index af493af..68ca2f4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -97,14 +97,15 @@
           s"${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
       }
       val profilingName = ruleParam.getOutDfName()
-      val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
       val profilingMetricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
         val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
         MetricWriteStep(mwName, profilingName, flattenType)
       }
-      profilingTransStep :: profilingMetricWriteStep :: Nil
+      val profilingTransStep =
+        SparkSqlTransformStep(profilingName, profilingSql, details, Some(profilingMetricWriteStep))
+      profilingTransStep :: Nil
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 03c2c8d..5a3acfb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -104,7 +104,7 @@
         s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` " +
           s"FROM `${inTimeTableName}`"
       }
-      val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
+      val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, None, true)
       latencyTransStep.parentSteps += inTimeTransStep
 
       // 3. timeliness metric
@@ -129,27 +129,26 @@
              |GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
       }
-      val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
-      metricTransStep.parentSteps += latencyTransStep
       val metricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
         val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
         MetricWriteStep(mwName, metricTableName, flattenType)
       }
+      val metricTransStep =
+        SparkSqlTransformStep(metricTableName, metricSql, emptyMap, Some(metricWriteStep))
+      metricTransStep.parentSteps += latencyTransStep
 
       // current steps
       val transSteps1 = metricTransStep :: Nil
-      val writeSteps1 = metricWriteStep :: Nil
 
       // 4. timeliness record
-      val (transSteps2, writeSteps2) = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
+      val transSteps2 = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
         case Some(tsh) =>
           val recordTableName = "__lateRecords"
           val recordSql = {
             s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}"
           }
-          val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
           val recordWriteStep = {
             val rwName =
               ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -157,12 +156,16 @@
 
             RecordWriteStep(rwName, recordTableName, None)
           }
-          (recordTransStep :: Nil, recordWriteStep :: Nil)
-        case _ => (Nil, Nil)
+          val recordTransStep =
+            SparkSqlTransformStep(recordTableName, recordSql, emptyMap, Some(recordWriteStep))
+          recordTransStep.parentSteps += latencyTransStep
+
+          recordTransStep :: Nil
+        case _ => Nil
       }
 
       // 5. ranges
-      val (transSteps3, writeSteps3) = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
+      val transSteps3 = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
         case Some(stepSize) =>
           // 5.1 range
           val rangeTableName = "__range"
@@ -174,6 +177,7 @@
              """.stripMargin
           }
           val rangeTransStep = SparkSqlTransformStep(rangeTableName, rangeSql, emptyMap)
+          rangeTransStep.parentSteps += latencyTransStep
 
           // 5.2 range metric
           val rangeMetricTableName = "__rangeMetric"
@@ -190,20 +194,20 @@
                  |FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, `${stepColName}`
                 """.stripMargin
           }
-          val rangeMetricTransStep =
-            SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
-          rangeMetricTransStep.parentSteps += rangeTransStep
           val rangeMetricWriteStep = {
             MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
           }
+          val rangeMetricTransStep =
+            SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap, Some(rangeMetricWriteStep))
+          rangeMetricTransStep.parentSteps += rangeTransStep
 
-          (rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
-        case _ => (Nil, Nil)
+          rangeMetricTransStep :: Nil
+        case _ => Nil
       }
 
       // 6. percentiles
       val percentiles = getPercentiles(details)
-      val (transSteps4, writeSteps4) = if (percentiles.size > 0) {
+      val transSteps4 = if (percentiles.size > 0) {
         val percentileTableName = "__percentile"
         val percentileColName = details.getStringOrKey(_percentileColPrefix)
         val percentileCols = percentiles.map { pct =>
@@ -217,19 +221,18 @@
              |FROM `${latencyTableName}`
             """.stripMargin
         }
-        val percentileTransStep =
-          SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
-
         val percentileWriteStep = {
           MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
         }
+        val percentileTransStep =
+          SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap, Some(percentileWriteStep))
+        percentileTransStep.parentSteps += latencyTransStep
 
-        (percentileTransStep :: Nil, percentileWriteStep :: Nil)
-      } else (Nil, Nil)
+        percentileTransStep :: Nil
+      } else Nil
 
       // full steps
-      transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
-        writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
+      transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 7f259ea..a19b35c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -117,7 +117,7 @@
         s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
           s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
       }
-      val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
+      val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, None, true)
       groupTransStep.parentSteps += joinedTransStep
 
       // 5. total metric
@@ -131,8 +131,9 @@
              |FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
       }
-      val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
       val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
+      val totalTransStep =
+        SparkSqlTransformStep(totalTableName, totalSql, emptyMap, Some(totalMetricWriteStep))
 
       // 6. unique record
       val uniqueRecordTableName = "__uniqueRecord"
@@ -155,24 +156,21 @@
              |FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
       }
-      val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
-      uniqueTransStep.parentSteps += uniqueRecordTransStep
-
       val uniqueMetricWriteStep =
         MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
+      val uniqueTransStep =
+        SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap, Some(uniqueMetricWriteStep))
+      uniqueTransStep.parentSteps += uniqueRecordTransStep
 
       val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
-      val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
 
       val duplicationArrayName = details.getString(_duplicationArray, "")
-      val (transSteps2, writeSteps2) = if (duplicationArrayName.nonEmpty) {
+      val transSteps2 = if (duplicationArrayName.nonEmpty) {
         // 8. duplicate record
         val dupRecordTableName = "__dupRecords"
         val dupRecordSql = {
           s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
         }
-        val dupRecordTransStep =
-          SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
 
         val dupRecordWriteStep = {
           val rwName =
@@ -181,6 +179,8 @@
 
           RecordWriteStep(rwName, dupRecordTableName)
         }
+        val dupRecordTransStep =
+          SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, Some(dupRecordWriteStep), true)
 
         // 9. duplicate metric
         val dupMetricTableName = "__dupMetric"
@@ -201,18 +201,22 @@
              |GROUP BY ${dupMetricGroupbyClause}
           """.stripMargin
         }
-        val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
-        dupMetricTransStep.parentSteps += dupRecordTransStep
         val dupMetricWriteStep = {
           MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
         }
+        val dupMetricTransStep =
+          SparkSqlTransformStep(dupMetricTableName,
+            dupMetricSql,
+            emptyMap,
+            Some(dupMetricWriteStep)
+          )
+        dupMetricTransStep.parentSteps += dupRecordTransStep
 
-        (dupMetricTransStep :: Nil,
-          dupRecordWriteStep :: dupMetricWriteStep :: Nil)
-      } else (Nil, Nil)
+        dupMetricTransStep :: Nil
+      } else Nil
 
       // full steps
-      transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
+      transSteps1 ++ transSteps2
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index b07595a..c393706 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -19,14 +19,16 @@
 package org.apache.griffin.measure.step.transform
 
 import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.WriteStep
 
 /**
   * data frame ops transform step
   */
-case class DataFrameOpsTransformStep(name: String,
+case class DataFrameOpsTransformStep[T <: WriteStep](name: String,
                                      inputDfName: String,
                                      rule: String,
                                      details: Map[String, Any],
+                                     writeStepOpt: Option[T] = None,
                                      cache: Boolean = false
                                     ) extends TransformStep {
 
@@ -43,7 +45,10 @@
       }
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)
       context.runTimeTableRegister.registerTable(name, df)
-      true
+      writeStepOpt match {
+        case Some(writeStep) => writeStep.execute(context)
+        case None => true
+      }
     } catch {
       case e: Throwable =>
         error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}", e)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 59ea822..00edf07 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -19,23 +19,27 @@
 package org.apache.griffin.measure.step.transform
 
 import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.WriteStep
 
 /**
   * spark sql transform step
   */
-case class SparkSqlTransformStep(name: String,
-                                 rule: String,
-                                 details: Map[String, Any],
-                                 cache: Boolean = false
-                                ) extends TransformStep {
-
+case class SparkSqlTransformStep[T <: WriteStep](name: String,
+                                                 rule: String,
+                                                 details: Map[String, Any],
+                                                 writeStepOpt: Option[T] = None,
+                                                 cache: Boolean = false
+                                                ) extends TransformStep {
   def doExecute(context: DQContext): Boolean = {
     val sqlContext = context.sqlContext
     try {
       val df = sqlContext.sql(rule)
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)
       context.runTimeTableRegister.registerTable(name, df)
-      true
+      writeStepOpt match {
+        case Some(writeStep) => writeStep.execute(context)
+        case None => true
+      }
     } catch {
       case e: Throwable =>
         error(s"run spark sql [ ${rule} ] error: ${e.getMessage}", e)
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index e640c45..5314669 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -19,15 +19,14 @@
 package org.apache.griffin.measure.step
 
 import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import org.scalatest._
 
+import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.context.ContextId
 import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.step.transform.TransformStep
 
-import org.scalatest._
-
 class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
 
   case class DualTransformStep(name: String,
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
index 3107852..1a9c452 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
@@ -27,6 +27,7 @@
 import javax.persistence.FetchType;
 import javax.persistence.JoinColumn;
 import javax.persistence.OneToMany;
+import javax.persistence.OrderBy;
 
 
 @Entity
@@ -36,6 +37,7 @@
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
         CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "evaluate_rule_id")
+    @OrderBy("id ASC")
     private List<Rule> rules = new ArrayList<>();
 
     public List<Rule> getRules() {