[GRIFFIN-316] Fix job exception handling

**What changes were proposed in this pull request?**

Currently we are using Try instance to represent the results of a DQ job, whether succeeded or failed. But as we are only wrapping the Boolean result by applying "Try" at the most outside level, the underlying failure would not be able to caught and it would always return "Success" even if exception got.

This is to modify all the underlying execute/doExecute methods of a DQ job, by handling exception with "Try" instances so that it could be passed properly to users when things get wrong.

**Does this PR introduce any user-facing change?**
No.

**How was this patch tested?**
Griffin test suite.

Author: Yu <yu.liu003@gmail.com>

Closes #562 from PnPie/exception_catch.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
index 4b19cd6..81e0867 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
@@ -17,16 +17,22 @@
 
 package org.apache.griffin.measure.job
 
+import scala.util.{Failure, Success, Try}
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 
 case class DQJob(dqSteps: Seq[DQStep]) extends Serializable {
 
-  /**
-   * @return execution success
-   */
-  def execute(context: DQContext): Boolean = {
-    dqSteps.forall(dqStep => dqStep.execute(context))
+  def execute(context: DQContext): Try[Boolean] = {
+    dqSteps
+      .map(_.execute(context))
+      .foldLeft(Try(true)) { (ret, stepResult) =>
+        (ret, stepResult) match {
+          case (Success(_), nextResult) => nextResult
+          case (Failure(_), _) => ret
+        }
+      }
   }
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index b23dd93..dc1fb52 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -59,7 +59,7 @@
     GriffinUDFAgent.register(sparkSession)
   }
 
-  def run: Try[Boolean] = Try {
+  def run: Try[Boolean] = {
     // start time
     val startTime = new Date().getTime
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
index a6eb95a..0b4527c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step
 
+import scala.util.Try
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.context.DQContext
 
@@ -27,7 +29,7 @@
   /**
    * @return execution success
    */
-  def execute(context: DQContext): Boolean
+  def execute(context: DQContext): Try[Boolean]
 
   def getNames: Seq[String] = name :: Nil
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
index 0eaea64..5ee5740 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step
 
+import scala.util.{Failure, Success, Try}
+
 import org.apache.griffin.measure.context.DQContext
 
 /**
@@ -31,8 +33,15 @@
   /**
    * @return execution success
    */
-  def execute(context: DQContext): Boolean = {
-    dqSteps.forall(dqStep => dqStep.execute(context))
+  def execute(context: DQContext): Try[Boolean] = {
+    dqSteps
+      .map(_.execute(context))
+      .foldLeft(Try(true))((ret, stepResult) => {
+        (ret, stepResult) match {
+          case (Success(_), nextResult) => nextResult
+          case (Failure(_), _) => ret
+        }
+      })
   }
 
   override def getNames: Seq[String] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
index 11582d8..d358189 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -18,6 +18,7 @@
 package org.apache.griffin.measure.step.read
 
 import org.apache.spark.sql._
+import scala.util.Try
 
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
@@ -28,7 +29,7 @@
 
   val cache: Boolean
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     info(s"read data source [$name]")
     read(context) match {
       case Some(df) =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 1b3fb33..1d06146 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.transform
 
+import scala.util.Try
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.write.WriteStep
 
@@ -32,14 +34,13 @@
     cache: Boolean = false)
     extends TransformStep {
 
-  def doExecute(context: DQContext): Boolean = {
-    val sparkSession = context.sparkSession
-    try {
+  def doExecute(context: DQContext): Try[Boolean] =
+    Try {
+      val sparkSession = context.sparkSession
       val df = rule match {
         case DataFrameOps._fromJson => DataFrameOps.fromJson(sparkSession, inputDfName, details)
         case DataFrameOps._accuracy =>
           DataFrameOps.accuracy(sparkSession, inputDfName, context.contextId, details)
-
         case DataFrameOps._clear => DataFrameOps.clear(sparkSession, inputDfName, details)
         case _ => throw new Exception(s"df opr [ $rule ] not supported")
       }
@@ -47,13 +48,8 @@
       context.runTimeTableRegister.registerTable(name, df)
       writeStepOpt match {
         case Some(writeStep) => writeStep.execute(context)
-        case None => true
+        case None => Try(true)
       }
-    } catch {
-      case e: Throwable =>
-        error(s"run data frame ops [ $rule ] error: ${e.getMessage}", e)
-        false
-    }
-  }
+    }.flatten
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index fc0306f..8b6e203 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.transform
 
+import scala.util.Try
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.write.WriteStep
 
@@ -30,21 +32,17 @@
     writeStepOpt: Option[T] = None,
     cache: Boolean = false)
     extends TransformStep {
-  def doExecute(context: DQContext): Boolean = {
-    val sparkSession = context.sparkSession
-    try {
+
+  def doExecute(context: DQContext): Try[Boolean] =
+    Try {
+      val sparkSession = context.sparkSession
       val df = sparkSession.sql(rule)
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)
       context.runTimeTableRegister.registerTable(name, df)
       writeStepOpt match {
         case Some(writeStep) => writeStep.execute(context)
-        case None => true
+        case None => Try(true)
       }
-    } catch {
-      case e: Throwable =>
-        error(s"run spark sql [ $rule ] error: ${e.getMessage}", e)
-        false
-    }
-  }
+    }.flatten
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
index e2dfdd1..90c2cb0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -18,10 +18,10 @@
 package org.apache.griffin.measure.step.transform
 
 import scala.collection.mutable
-import scala.collection.mutable.HashSet
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
 
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.{DQStep, DQStepStatus}
@@ -40,41 +40,51 @@
 
   val parentSteps = new mutable.HashSet[TransformStep]
 
-  def doExecute(context: DQContext): Boolean
+  def doExecute(context: DQContext): Try[Boolean]
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = {
+
     val threadName = Thread.currentThread().getName
     info(threadName + " begin transform step : \n" + debugString())
+
     // Submit parents Steps
     val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { parentStep =>
       Future {
         val result = parentStep.execute(context)
         parentStep.synchronized {
-          if (result) {
-            parentStep.status = COMPLETE
-          } else {
-            parentStep.status = FAILED
+          result match {
+            case Success(_) => parentStep.status = COMPLETE
+            case Failure(_) => parentStep.status = FAILED
           }
         }
+        result
       }(TransformStep.transformStepContext)
     }
-    ThreadUtils.awaitResult(
+
+    val parentsResultSet = ThreadUtils.awaitResult(
       Future.sequence(parentStepFutures)(implicitly, TransformStep.transformStepContext),
       Duration.Inf)
 
+    val parentsResult = parentsResultSet.foldLeft(Try(true)) { (ret, step) =>
+      (ret, step) match {
+        case (Success(_), nextResult) => nextResult
+        case (Failure(_), _) => ret
+      }
+    }
+
     parentSteps.foreach(step => {
       while (step.status == RUNNING) {
         Thread.sleep(1000L)
       }
     })
-    val prepared = parentSteps.forall(step => step.status == COMPLETE)
-    if (prepared) {
-      val res = doExecute(context)
-      info(threadName + " end transform step : \n" + debugString())
-      res
-    } else {
-      error("Parent transform step failed!")
-      false
+
+    parentsResult match {
+      case Success(_) =>
+        info(threadName + " end transform step : \n" + debugString())
+        doExecute(context)
+      case Failure(_) =>
+        error("Parent transform step failed!")
+        parentsResult
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index c1af659..3faed1b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -19,6 +19,7 @@
 
 import org.apache.commons.lang.StringUtils
 import org.apache.spark.sql.DataFrame
+import scala.util.Try
 
 import org.apache.griffin.measure.context.DQContext
 
@@ -30,7 +31,7 @@
   val name: String = ""
   val writeTimestampOpt: Option[Long] = None
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     getDataSourceCacheUpdateDf(context) match {
       case Some(df) =>
         context.dataSources
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 40754e2..ed4bc54 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.write
 
+import scala.util.Try
+
 import org.apache.griffin.measure.context.DQContext
 
 /**
@@ -28,7 +30,7 @@
   val inputName: String = ""
   val writeTimestampOpt: Option[Long] = None
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
       val pr = try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index cdf337b..8f7f0c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.step.write
 
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.enums.{SimpleMode, TimestampMode}
 import org.apache.griffin.measure.configuration.enums.FlattenType.{
   ArrayFlattenType,
@@ -42,7 +44,7 @@
   val emptyMetricMap: Map[Long, Map[String, Any]] = Map[Long, Map[String, Any]]()
   val emptyMap: Map[String, Any] = Map[String, Any]()
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
 
     // get metric list from data frame
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 01db3fe..975bdc5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -19,6 +19,7 @@
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
+import scala.util.Try
 
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
@@ -35,7 +36,7 @@
     writeTimestampOpt: Option[Long] = None)
     extends WriteStep {
 
-  def execute(context: DQContext): Boolean = {
+  def execute(context: DQContext): Try[Boolean] = Try {
     val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
 
     val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
new file mode 100644
index 0000000..e8c72f9
--- /dev/null
+++ b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
@@ -0,0 +1,43 @@
+{
+  "name": "prof_batch",
+  "process.type": "batch",
+  "timestamp": 123456,
+  "data.sources": [
+    {
+      "name": "source",
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "dataframe.name": "this_table",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "spark-sql",
+            "rule": "select * from this_table where user_id < 10014"
+          }
+        ]
+      }
+    }
+  ],
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "out.dataframe.name": "prof",
+        "rule": "abc",
+        "out":[
+          {
+            "type": "metric",
+            "name": "prof",
+            "flatten": "array"
+          }
+        ]
+      }
+    ]
+  },
+
+  "sinks": ["CONSOLE"]
+}
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 633974c..a95d76e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.griffin.measure.job
 
+import org.apache.spark.sql.AnalysisException
+import scala.reflect.ClassTag
 import scala.util.{Failure, Success, Try}
 
 import org.apache.griffin.measure.Application.readParamFile
@@ -69,6 +71,15 @@
     dqContext.metricWrapper.metrics should equal(expectedMetrics)
   }
 
+  def runAndCheckException[T <: AnyRef](implicit classTag: ClassTag[T]): Unit = {
+    dqApp.run match {
+      case Success(_) =>
+        fail(
+          s"job ${dqApp.dqParam.getName} should not succeed, a ${classTag.toString} exception is expected.")
+      case Failure(ex) => assertThrows[T](throw ex)
+    }
+  }
+
   "accuracy batch job" should "work" in {
     dqApp = initApp("/_accuracy-batch-griffindsl.json")
     val expectedMetrics = Map(
@@ -139,4 +150,10 @@
 
     runAndCheckResult(expectedMetrics)
   }
+
+  "batch job" should "fail with exception caught due to invalid rules" in {
+    dqApp = initApp("/_profiling-batch-griffindsl_malformed.json")
+
+    runAndCheckException[AnalysisException]
+  }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a05ade6..a557dda 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -43,7 +43,12 @@
   var dqApp: DQApp = _
 
   def getConfigFilePath(fileName: String): String = {
-    getClass.getResource(fileName).getFile
+    try {
+      getClass.getResource(fileName).getFile
+    } catch {
+      case _: NullPointerException => throw new Exception(s"resource [$fileName] not found")
+      case ex: Throwable => throw ex
+    }
   }
 
   def initApp(dqParamFile: String): DQApp = {
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 5a227a0..834d8e0 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -18,6 +18,7 @@
 package org.apache.griffin.measure.step
 
 import org.scalatest._
+import scala.util.Try
 
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
@@ -34,7 +35,7 @@
       cache: Boolean = false)
       extends TransformStep {
 
-    def doExecute(context: DQContext): Boolean = {
+    def doExecute(context: DQContext): Try[Boolean] = Try {
       val threadName = Thread.currentThread().getName
       info(s"Step $name started with $threadName")
       Thread.sleep(duration * 1000L)
@@ -77,6 +78,6 @@
     step5.parentSteps += step4
 
     val context = getDqContext()
-    step5.execute(context) should be(true)
+    step5.execute(context).get should be(true)
   }
 }