build and run transform steps with multiple threads

There are sequence of dq steps in each DQJob,and run those steps one by one (with foldLeft function).

We can use multiple threads to run some of those steps which have no dependency.

For example:

In a DQBatchJob, a accuracyExpr will have for steps **_missRecords ,_missCount , __totalCount,   accu** .

_missCount and **_totalCount step can run at the same time .

In SeqDQStep ,it just need contains some root steps without dependency steps.

If each step knows it's dependency steps, and when they are ready, we can run the step itself .

Running step :
accu
| |---__missCount
| | |---__missRecords
| |---__totalCount

Running step :
__missCount
| |---__missRecords

Running step :
__missRecords

Running step :
__totalCount

Author: wankunde <wankunde@163.com>

Closes #504 from wankunde/master.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
index 60c8477..6a50ebb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -33,3 +33,12 @@
   def getNames(): Seq[String] = name :: Nil
 
 }
+
+object DQStepStatus extends Enumeration {
+  val PENDING = Value
+  val RUNNING = Value
+  val COMPLETE = Value
+  val FAILED = Value
+}
+
+
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index f7ff3ef..31eef69 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -111,6 +111,7 @@
             s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
+      missCountTransStep.parentSteps += missRecordsTransStep
 
       // 3. total count
       val totalCountTableName = "__totalCount"
@@ -151,6 +152,8 @@
          """.stripMargin
       }
       val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
+      accuracyTransStep.parentSteps += missCountTransStep
+      accuracyTransStep.parentSteps += totalCountTransStep
       val accuracyMetricWriteSteps = procType match {
         case BatchProcessType =>
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
@@ -160,14 +163,12 @@
         case StreamingProcessType => Nil
       }
 
-      // accuracy current steps
-      val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil
-      val writeSteps1 =
+      val batchWriteSteps =
         accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
 
-      // streaming extra steps
-      val (transSteps2, writeSteps2) = procType match {
-        case BatchProcessType => (Nil, Nil)
+      procType match {
+        case BatchProcessType => accuracyTransStep :: batchWriteSteps
+        // streaming extra steps
         case StreamingProcessType =>
           // 5. accuracy metric merge
           val accuracyMetricTableName = "__accuracy"
@@ -179,6 +180,7 @@
           )
           val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
             accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
+          accuracyMetricTransStep.parentSteps += accuracyTransStep
           val accuracyMetricWriteStep = {
             val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
             val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
@@ -196,6 +198,7 @@
           }
           val accuracyRecordTransStep = SparkSqlTransformStep(
             accuracyRecordTableName, accuracyRecordSql, emptyMap)
+          accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
           val accuracyRecordWriteStep = {
             val rwName =
               ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -205,12 +208,9 @@
           }
 
           // extra steps
-          (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
-            accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
+          val streamingWriteSteps = accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil
+          accuracyRecordTransStep :: batchWriteSteps ++ streamingWriteSteps
       }
-
-      // full steps
-      transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 87cfa86..3df4a12 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -93,6 +93,7 @@
 
       val incompleteRecordTransStep =
         SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
+      incompleteRecordTransStep.parentSteps += sourceAliasTransStep
       val incompleteRecordWriteStep = {
         val rwName =
           ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -112,6 +113,7 @@
       }
       val incompleteCountTransStep =
         SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+      incompleteCountTransStep.parentSteps += incompleteRecordTransStep
 
       // 4. total count
       val totalCountTableName = "__totalCount"
@@ -124,6 +126,7 @@
             s"FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
+      totalCountTransStep.parentSteps += sourceAliasTransStep
 
       // 5. complete metric
       val completeTableName = ruleParam.getOutDfName()
@@ -147,6 +150,8 @@
          """.stripMargin
       }
       val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
+      completeTransStep.parentSteps += incompleteCountTransStep
+      completeTransStep.parentSteps += totalCountTransStep
       val completeWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
@@ -154,14 +159,8 @@
         MetricWriteStep(mwName, completeTableName, flattenType)
       }
 
-      val transSteps = {
-        sourceAliasTransStep :: incompleteRecordTransStep ::
-          incompleteCountTransStep :: totalCountTransStep ::
-          completeTransStep :: Nil
-      }
-      val writeSteps = {
-        incompleteRecordWriteStep :: completeWriteStep :: Nil
-      }
+      val transSteps = completeTransStep :: Nil
+      val writeSteps = incompleteRecordWriteStep :: completeWriteStep :: Nil
 
       // full steps
       transSteps ++ writeSteps
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 70fee6c..0e2b10e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -111,6 +111,7 @@
         s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
       }
       val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
+      totalTransStep.parentSteps += sourceAliasTransStep
       val totalMetricWriteStep = {
         MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
       }
@@ -128,8 +129,9 @@
       }
       val selfGroupTransStep =
         SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+      selfGroupTransStep.parentSteps += sourceAliasTransStep
 
-      val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil
+      val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
       val writeSteps1 = totalMetricWriteStep :: Nil
 
       val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
@@ -163,6 +165,8 @@
             """.stripMargin
           }
           val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
+          joinedTransStep.parentSteps += selfGroupTransStep
+          joinedTransStep.parentSteps += olderAliasTransStep
 
           // 6. group by joined data
           val groupTableName = "__group"
@@ -176,6 +180,7 @@
              """.stripMargin
           }
           val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap)
+          groupTransStep.parentSteps += joinedTransStep
 
           // 7. final duplicate count
           val finalDupCountTableName = "__finalDupCount"
@@ -204,12 +209,13 @@
           }
           val finalDupCountTransStep =
             SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+          finalDupCountTransStep.parentSteps += groupTransStep
 
-          ((olderAliasTransStep :: joinedTransStep
-            :: groupTransStep :: finalDupCountTransStep :: Nil,
-            targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
+          ((finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil),
+            finalDupCountTableName)
         case _ =>
-          ((Nil, Nil), selfGroupTableName)
+          ((selfGroupTransStep :: Nil, totalMetricWriteStep :: Nil),
+            selfGroupTableName)
       }
 
       // 8. distinct metric
@@ -262,6 +268,7 @@
                """.stripMargin
           }
           val rnTransStep = SparkSqlTransformStep(rnTableName, rnSql, emptyMap)
+          rnTransStep.parentSteps += informedTransStep
 
           // 11. recognize duplicate items
           val dupItemsTableName = "__dupItems"
@@ -272,6 +279,7 @@
                """.stripMargin
           }
           val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
+          dupItemsTransStep.parentSteps += rnTransStep
           val dupItemsWriteStep = {
             val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
             RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
@@ -289,6 +297,7 @@
           }
           val groupDupMetricTransStep =
             SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+          groupDupMetricTransStep.parentSteps += dupItemsTransStep
           val groupDupMetricWriteStep = {
             MetricWriteStep(duplicationArrayName,
               groupDupMetricTableName,
@@ -296,9 +305,7 @@
               writeTimestampOpt)
           }
 
-          val msteps = {
-            informedTransStep :: rnTransStep :: dupItemsTransStep :: groupDupMetricTransStep :: Nil
-          }
+          val msteps = groupDupMetricTransStep :: Nil
           val wsteps = if (recordEnable) {
             dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
           } else {
@@ -344,6 +351,7 @@
               """.stripMargin
           }
           val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
+          dupMetricTransStep.parentSteps += dupRecordTransStep
           val dupMetricWriteStep = {
             MetricWriteStep(
               duplicationArrayName,
@@ -353,9 +361,7 @@
             )
           }
 
-          val msteps = {
-            dupRecordTransStep :: dupMetricTransStep :: Nil
-          }
+          val msteps = dupMetricTransStep :: Nil
           val wsteps = if (recordEnable) {
             dupRecordWriteStep :: dupMetricWriteStep :: Nil
           } else {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 71eb452..03c2c8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -105,6 +105,7 @@
           s"FROM `${inTimeTableName}`"
       }
       val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
+      latencyTransStep.parentSteps += inTimeTransStep
 
       // 3. timeliness metric
       val metricTableName = ruleParam.getOutDfName()
@@ -129,6 +130,7 @@
            """.stripMargin
       }
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
+      metricTransStep.parentSteps += latencyTransStep
       val metricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
@@ -137,7 +139,7 @@
       }
 
       // current steps
-      val transSteps1 = inTimeTransStep :: latencyTransStep :: metricTransStep :: Nil
+      val transSteps1 = metricTransStep :: Nil
       val writeSteps1 = metricWriteStep :: Nil
 
       // 4. timeliness record
@@ -190,11 +192,12 @@
           }
           val rangeMetricTransStep =
             SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+          rangeMetricTransStep.parentSteps += rangeTransStep
           val rangeMetricWriteStep = {
             MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
           }
 
-          (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
+          (rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
         case _ => (Nil, Nil)
       }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 28e9d48..7f259ea 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -64,7 +64,7 @@
       warn(s"[${timestamp}] data source ${sourceName} not exists")
       Nil
     } else if (!context.runTimeTableRegister.existsTable(targetName)) {
-      println(s"[${timestamp}] data source ${targetName} not exists")
+      warn(s"[${timestamp}] data source ${targetName} not exists")
       Nil
     } else {
       val selItemsClause = analyzer.selectionPairs.map { pair =>
@@ -104,6 +104,8 @@
         s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}"
       }
       val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
+      joinedTransStep.parentSteps += sourceTransStep
+      joinedTransStep.parentSteps += targetTransStep
 
       // 4. group
       val groupTableName = "__group"
@@ -116,6 +118,7 @@
           s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
       }
       val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
+      groupTransStep.parentSteps += joinedTransStep
 
       // 5. total metric
       val totalTableName = "__totalMetric"
@@ -138,6 +141,7 @@
       }
       val uniqueRecordTransStep =
         SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
+      uniqueRecordTransStep.parentSteps += groupTransStep
 
       // 7. unique metric
       val uniqueTableName = "__uniqueMetric"
@@ -152,12 +156,12 @@
            """.stripMargin
       }
       val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
+      uniqueTransStep.parentSteps += uniqueRecordTransStep
 
       val uniqueMetricWriteStep =
         MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
 
-      val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
-        totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
+      val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
       val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
 
       val duplicationArrayName = details.getString(_duplicationArray, "")
@@ -198,11 +202,12 @@
           """.stripMargin
         }
         val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
+        dupMetricTransStep.parentSteps += dupRecordTransStep
         val dupMetricWriteStep = {
           MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
         }
 
-        (dupRecordTransStep :: dupMetricTransStep :: Nil,
+        (dupMetricTransStep :: Nil,
           dupRecordWriteStep :: dupMetricWriteStep :: Nil)
       } else (Nil, Nil)
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 4ac35b2..b07595a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -30,7 +30,7 @@
                                      cache: Boolean = false
                                     ) extends TransformStep {
 
-  def execute(context: DQContext): Boolean = {
+  def doExecute(context: DQContext): Boolean = {
     val sqlContext = context.sqlContext
     try {
       val df = rule match {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 39b6a0e..59ea822 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -29,7 +29,7 @@
                                  cache: Boolean = false
                                 ) extends TransformStep {
 
-  def execute(context: DQContext): Boolean = {
+  def doExecute(context: DQContext): Boolean = {
     val sqlContext = context.sqlContext
     try {
       val df = sqlContext.sql(rule)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
index 995ce49..8c094b2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -18,7 +18,15 @@
 */
 package org.apache.griffin.measure.step.transform
 
+import scala.collection.mutable.HashSet
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.DQStepStatus._
+import org.apache.griffin.measure.utils.ThreadUtils
 
 trait TransformStep extends DQStep {
 
@@ -28,4 +36,75 @@
 
   val cache: Boolean
 
+  var status = PENDING
+
+  val parentSteps = new HashSet[TransformStep]
+
+  def doExecute(context: DQContext): Boolean
+
+  def execute(context: DQContext): Boolean = {
+    val threadName = Thread.currentThread().getName
+    info(threadName + " bigin transform step : \n" + debugString())
+    // Submit parents Steps
+    val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { parentStep =>
+      Future {
+        val result = parentStep.execute(context)
+        parentStep.synchronized {
+          if (result) {
+            parentStep.status = COMPLETE
+          } else {
+            parentStep.status = FAILED
+          }
+        }
+      }(TransformStep.transformStepContext)
+    }
+    ThreadUtils.awaitResult(
+      Future.sequence(parentStepFutures)(implicitly, TransformStep.transformStepContext),
+      Duration.Inf)
+
+    parentSteps.map(step => {
+      while (step.status == RUNNING) {
+        Thread.sleep(1000L)
+      }
+    })
+    val prepared = parentSteps.foldLeft(true)((ret, step) => ret && step.status == COMPLETE)
+    if (prepared) {
+      val res = doExecute(context)
+      info(threadName + " end transform step : \n" + debugString())
+      res
+    } else {
+      error("Parent transform step failed!")
+      false
+    }
+  }
+
+  def checkAndUpdateStatus(step: TransformStep): Boolean = {
+    step.synchronized {
+      if (step.status == PENDING) {
+        step.status = RUNNING
+        true
+      } else {
+        false
+      }
+    }
+  }
+
+  def debugString(level: Int = 0): String = {
+    val stringBuffer = new StringBuilder
+    if (level > 0) {
+      for (i <- 0 to level - 1) {
+        stringBuffer.append("|   ")
+      }
+      stringBuffer.append("|---")
+    }
+    stringBuffer.append(name + "\n")
+    parentSteps.foreach(parentStep => stringBuffer.append(parentStep.debugString(level + 1)))
+    stringBuffer.toString()
+  }
 }
+
+object TransformStep {
+  private[transform] val transformStepContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("transform-step"))
+}
+
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
new file mode 100644
index 0000000..d484ec9
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -0,0 +1,227 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import java.util.concurrent._
+
+import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.duration.Duration
+import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
+import scala.util.control.NonFatal
+
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+
+private[griffin] object ThreadUtils {
+
+  private val sameThreadExecutionContext =
+    ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
+
+  /**
+   * An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`.
+   * The caller should make sure the tasks running in this `ExecutionContextExecutor` are short and
+   * never block.
+   */
+  def sameThread: ExecutionContextExecutor = sameThreadExecutionContext
+
+  /**
+   * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
+   */
+  def namedThreadFactory(prefix: String): ThreadFactory = {
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
+  }
+
+  /**
+   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
+   * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
+   */
+  def newDaemonCachedThreadPool(
+      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    val threadPool = new ThreadPoolExecutor(
+      maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
+      maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
+      keepAliveSeconds,
+      TimeUnit.SECONDS,
+      new LinkedBlockingQueue[Runnable],
+      threadFactory)
+    threadPool.allowCoreThreadTimeOut(true)
+    threadPool
+  }
+
+  /**
+   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Wrapper over newSingleThreadExecutor.
+   */
+  def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    Executors.newSingleThreadExecutor(threadFactory)
+  }
+
+  /**
+   * Wrapper over ScheduledThreadPoolExecutor.
+   */
+  def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
+    // By default, a cancelled task is not automatically removed from the work queue until its delay
+    // elapses. We have to enable it manually.
+    executor.setRemoveOnCancelPolicy(true)
+    executor
+  }
+
+  /**
+   * Run a piece of code in a new thread and return the result. Exception in the new thread is
+   * thrown in the caller thread with an adjusted stack trace that removes references to this
+   * method for clarity. The exception stack traces will be like the following
+   *
+   * SomeException: exception-message
+   *   at CallerClass.body-method (sourcefile.scala)
+   *   at ... run in separate thread using org.apache.griffin.measure.utils.ThreadUtils ... ()
+   *   at CallerClass.caller-method (sourcefile.scala)
+   *   ...
+   */
+  def runInNewThread[T](
+      threadName: String,
+      isDaemon: Boolean = true)(body: => T): T = {
+    @volatile var exception: Option[Throwable] = None
+    @volatile var result: T = null.asInstanceOf[T]
+
+    val thread = new Thread(threadName) {
+      override def run(): Unit = {
+        try {
+          result = body
+        } catch {
+          case NonFatal(e) =>
+            exception = Some(e)
+        }
+      }
+    }
+    thread.setDaemon(isDaemon)
+    thread.start()
+    thread.join()
+
+    exception match {
+      case Some(realException) =>
+        // Remove the part of the stack that shows method calls into this helper method
+        // This means drop everything from the top until the stack element
+        // ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
+        val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
+
+        // Remove the part of the new thread stack that shows methods call from this helper method
+        val extraStackTrace = realException.getStackTrace.takeWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName))
+
+        // Combine the two stack traces, with a place holder just specifying that there
+        // was a helper method used, without any further details of the helper
+        val placeHolderStackElem = new StackTraceElement(
+          s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
+          " ", "", -1)
+        val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
+
+        // Update the stack trace and rethrow the exception in the caller thread
+        realException.setStackTrace(finalStackTrace)
+        throw realException
+      case None =>
+        result
+    }
+  }
+
+  /**
+   * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
+   */
+  def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+    // Custom factory to set thread names
+    val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
+      override def newThread(pool: SForkJoinPool) =
+        new SForkJoinWorkerThread(pool) {
+          setName(prefix + "-" + super.getName)
+        }
+    }
+    new SForkJoinPool(maxThreadNumber, factory,
+      null, // handler
+      false // asyncMode
+    )
+  }
+
+  // scalastyle:off awaitresult
+  /**
+   * Preferred alternative to `Await.result()`.
+   *
+   * This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
+   * that this thread's stack trace appears in logs.
+   *
+   * In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
+   * `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
+   * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
+   * method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
+   * In general, we should use this method because it's hard to debug when [[ThreadLocal]]s leak
+   * to other tasks.
+   */
+  @throws(classOf[Exception])
+  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.result(atMost)(awaitPermission)
+    } catch {
+      // TimeoutException is thrown in the current thread, so not need to warp the exception.
+      case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+        throw new Exception("Exception thrown in awaitResult: ", t)
+    }
+  }
+  // scalastyle:on awaitresult
+
+  // scalastyle:off awaitready
+  /**
+   * Preferred alternative to `Await.ready()`.
+   *
+   * @see [[awaitResult]]
+   */
+  @throws(classOf[Exception])
+  def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.ready(atMost)(awaitPermission)
+    } catch {
+      // TimeoutException is thrown in the current thread, so not need to warp the exception.
+      case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+        throw new Exception("Exception thrown in awaitResult: ", t)
+    }
+  }
+  // scalastyle:on awaitready
+}
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
new file mode 100644
index 0000000..e640c45
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.ContextId
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.step.transform.TransformStep
+
+import org.scalatest._
+
+class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
+
+  case class DualTransformStep(name: String,
+                               duration: Int,
+                               rule: String = "",
+                               details: Map[String, Any] = Map(),
+                               cache: Boolean = false
+                              ) extends TransformStep {
+
+    def doExecute(context: DQContext): Boolean = {
+      val threadName = Thread.currentThread().getName
+      info(s"Step $name started with $threadName")
+      Thread.sleep(duration * 1000L)
+      info(s"Step $name finished with $threadName")
+      true
+    }
+  }
+
+  private def getDqContext(name: String = "test-context"): DQContext = {
+    DQContext(
+      ContextId(System.currentTimeMillis),
+      name,
+      Nil,
+      Nil,
+      BatchProcessType
+    )(spark)
+  }
+
+  /**
+    * Run transform steps in parallel. Here are the dependencies of transform steps
+    *
+    * step5
+    * |   |---step2
+    * |   |   |---step1
+    * |   |---step3
+    * |   |   |---step1
+    * |   |---step4
+    *
+    * step1 : -->
+    * step2 :    --->
+    * step3 :    ---->
+    * step4 : ->
+    * step5 :         -->
+    *
+    */
+  "transform step " should "be run steps in parallel" in {
+    val step1 = DualTransformStep("step1", 3)
+    val step2 = DualTransformStep("step2", 4)
+    step2.parentSteps += step1
+    val step3 = DualTransformStep("step3", 5)
+    step3.parentSteps += step1
+    val step4 = DualTransformStep("step4", 2)
+    val step5 = DualTransformStep("step5", 3)
+    step5.parentSteps += step2
+    step5.parentSteps += step3
+    step5.parentSteps += step4
+
+    val context = getDqContext()
+    step5.execute(context) should be (true)
+  }
+}