[GRIFFIN-358] Allow users to run old "evaluate.rule" configs as well
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index f01e373..1765f4d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -77,31 +77,38 @@
     assert(dataSources != null, "data.sources should not be null")
     getDataSources.foreach(_.validate())
 
-    assert(measures != null, "measures should not be null")
-    assert(measures.nonEmpty, "No measures were defined")
-    measures.foreach(_.validate())
+    if (measures != null && measures.nonEmpty) {
+      measures.foreach(_.validate())
 
-    val repeatedMeasures = getMeasures
-      .map(_.getName)
-      .groupBy(x => x)
-      .mapValues(_.size)
-      .filter(_._2 > 1)
-      .keys
-    assert(
-      repeatedMeasures.isEmpty,
-      s"Measure names must be unique. " +
-        s"Duplicate Measures names ['${repeatedMeasures.mkString("', '")}'] were found.")
+      val repeatedMeasures = measures
+        .map(_.getName)
+        .groupBy(x => x)
+        .mapValues(_.size)
+        .filter(_._2 > 1)
+        .keys
 
-    val invalidMeasureSources = getMeasures
-      .map(_.getDataSource)
-      .map(dataSource => (dataSource, getDataSources.exists(_.getName.matches(dataSource))))
-      .filterNot(_._2)
-      .map(_._1)
+      assert(
+        repeatedMeasures.isEmpty,
+        s"Measure names must be unique. " +
+          s"Duplicate Measures names ['${repeatedMeasures.mkString("', '")}'] were found.")
 
-    assert(
-      invalidMeasureSources.isEmpty,
-      s"Measure source(s) undefined." +
-        s" Unknown source(s) ['${invalidMeasureSources.mkString("', '")}'] were found.")
+      val invalidMeasureSources = measures
+        .map(_.getDataSource)
+        .map(dataSource => (dataSource, getDataSources.exists(_.getName.matches(dataSource))))
+        .filterNot(_._2)
+        .map(_._1)
+
+      assert(
+        invalidMeasureSources.isEmpty,
+        s"Measure source(s) undefined." +
+          s" Unknown source(s) ['${invalidMeasureSources.mkString("', '")}'] were found.")
+    } else if (evaluateRule != null) {
+      evaluateRule.validate()
+    } else {
+      assert(
+        assertion = false,
+        "Either 'measure' or 'evaluate.rule' must be defined in the Application Config.")
+    }
   }
 }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index b2da9ca..254f685 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -51,15 +51,12 @@
     val dataSource = addColumnPrefix(originalSource, SourcePrefixStr)
 
     val refDataSource =
-      addColumnPrefix(
-        sparkSession.read.table(refSource).drop(ConstantColumns.tmst),
-        refPrefixStr)
+      addColumnPrefix(sparkSession.read.table(refSource).drop(ConstantColumns.tmst), refPrefixStr)
 
     val accuracyExprs = exprOpt.get
       .map(toAccuracyExpr)
       .distinct
-      .map(x =>
-        AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))
+      .map(x => AccuracyExpr(s"$SourcePrefixStr${x.sourceCol}", s"$refPrefixStr${x.refCol}"))
 
     val joinExpr =
       accuracyExprs
@@ -68,8 +65,7 @@
 
     val indicatorExpr =
       accuracyExprs
-        .map(e =>
-          coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol))
+        .map(e => coalesce(col(e.sourceCol), emptyCol) notEqual coalesce(col(e.refCol), emptyCol))
         .reduce(_ or _)
 
     val nullExpr = accuracyExprs.map(e => col(e.sourceCol).isNull).reduce(_ or _)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 1e5a930..9c03f98 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -29,6 +29,7 @@
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.execution.MeasureExecutor
+import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 import org.apache.griffin.measure.utils.CommonUtils
@@ -80,16 +81,24 @@
           val applicationId = sparkSession.sparkContext.applicationId
           dqContext.getSinks.foreach(_.open(applicationId))
 
-          Try {
-            val t = Try(MeasureExecutor(dqContext).execute(dqParam.getMeasures))
+          if (dqParam.getMeasures != null && dqParam.getMeasures.nonEmpty) {
+            Try {
+              val t = Try(MeasureExecutor(dqContext).execute(dqParam.getMeasures))
 
-            t match {
-              case Success(_) =>
-              case Failure(exception) =>
-                error("Exception", exception)
+              t match {
+                case Success(_) =>
+                case Failure(exception) =>
+                  error("Exception", exception)
+              }
+
+              t.isSuccess
             }
+          } else {
+            // build job
+            val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
 
-            t.isSuccess
+            // dq job execute
+            dqJob.execute(dqContext)
           }
         },
         TimeUnit.MILLISECONDS)
diff --git a/measure/src/test/resources/_no_measure_or_rules_malformed.json b/measure/src/test/resources/_no_measure_or_rules_malformed.json
new file mode 100644
index 0000000..0f96a93
--- /dev/null
+++ b/measure/src/test/resources/_no_measure_or_rules_malformed.json
@@ -0,0 +1,24 @@
+{
+  "name": "prof_batch",
+  "process.type": "batch",
+  "timestamp": 123456,
+  "data.sources": [
+    {
+      "name": "source",
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "dataframe.name": "this_table",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
+        },
+        "pre.proc": [
+          "select * from this_table where user_id < 10014"
+        ]
+      }
+    }
+  ],
+  "sinks": [
+    "CONSOLESink"
+  ]
+}
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index 868b0ed..30c102a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -169,10 +169,14 @@
   }
 
   "batch job" should "fail with exception caught due to invalid rules" in {
-    assertThrows[java.lang.AssertionError] {
+    assertThrows[org.apache.spark.sql.AnalysisException] {
       runApp("/_profiling-batch-griffindsl_malformed.json")
     }
+  }
 
-    assertThrows[NullPointerException](runAndCheckException)
+  "batch job" should "fail with exception when no measures or rules are defined" in {
+    assertThrows[AssertionError] {
+      runApp("/_no_measure_or_rules_malformed.json")
+    }
   }
 }