[GRIFFIN-358] Fixed breaking test cases
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
index ab51a7a..ab771e5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/MeasureExecutor.scala
@@ -74,9 +74,9 @@
 
       persistRecords(measure, recordsDf)
       persistMetrics(measure, metricsDf)
-    })
 
-    MetricFlushStep().execute(context)
+      MetricFlushStep().execute(context)
+    })
   }
 
   private def createMeasure(measureParam: MeasureParam): Measure = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
index b618010..2f1cdf3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasure.scala
@@ -75,8 +75,8 @@
     val nullExpr = accuracyExprs.map(e => col(e.sourceCol).isNull).reduce(_ or _)
 
     val recordsDf = removeColumnPrefix(
-      targetDataSource
-        .join(dataSource, joinExpr, "outer")
+      dataSource
+        .join(targetDataSource, joinExpr, "left")
         .withColumn(valueColumn, when(indicatorExpr or nullExpr, 1).otherwise(0)),
       SourcePrefixStr)
       .select((originalCols :+ valueColumn).map(col): _*)
@@ -161,7 +161,7 @@
   }
 }
 
-object AccuracyMeasure{
+object AccuracyMeasure {
   final val SourcePrefixStr: String = "__source_"
   final val TargetPrefixStr: String = "__target_"
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 936d329..56e93ad 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -51,8 +51,7 @@
   }
 
   override def close(): Unit = {
-    info(
-      s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
+    info(s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
   }
 
   override def sinkRecords(records: RDD[String], name: String): Unit = {}
@@ -60,7 +59,7 @@
   override def sinkRecords(records: Iterable[String], name: String): Unit = {}
 
   override def sinkMetrics(metrics: Map[String, Any]): Unit = {
-    info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
+    griffinLogger.info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
   }
 
   override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json
index 7453b9e..9f5892b 100644
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -4,48 +4,78 @@
   "data.sources": [
     {
       "name": "source",
-      "baseline": true,
       "connector": {
-        "type": "avro",
+        "type": "file",
         "config": {
-          "file.name": "src/test/resources/users_info_src.avro"
+          "format": "avro",
+          "paths": [
+            "src/test/resources/users_info_src.avro"
+          ]
         }
       }
     },
     {
       "name": "target",
       "connector": {
-        "type": "avro",
+        "type": "file",
         "config": {
-          "file.name": "src/test/resources/users_info_target.avro"
+          "format": "avro",
+          "paths": [
+            "src/test/resources/users_info_target.avro"
+          ]
         }
       }
     }
   ],
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "accuracy",
-        "out.dataframe.name": "accu",
-        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "miss": "miss_count",
-          "total": "total_count",
-          "matched": "matched_count"
-        },
-        "out": [
+  "measures": [
+    {
+      "name": "accuracy_measure",
+      "type": "accuracy",
+      "data.source": "target",
+      "config": {
+        "target.source": "source",
+        "expr": [
           {
-            "type": "record",
-            "name": "missRecords"
+            "source.col": "user_id",
+            "target.col": "user_id"
+          },
+          {
+            "source.col": "first_name",
+            "target.col": "first_name"
+          },
+          {
+            "source.col": "last_name",
+            "target.col": "last_name"
+          },
+          {
+            "source.col": "address",
+            "target.col": "address"
+          },
+          {
+            "source.col": "email",
+            "target.col": "email"
+          },
+          {
+            "source.col": "phone",
+            "target.col": "phone"
+          },
+          {
+            "source.col": "post_code",
+            "target.col": "post_code"
           }
         ]
-      }
-    ]
-  },
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "accuracy_metric",
+          "flatten": "map"
+        }
+      ]
+    }
+  ],
   "sinks": [
-    "consoleSink"
+    "consoleSink",
+    "customSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index a8fdcf7..2a7f6a4 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -1,36 +1,39 @@
 {
   "name": "comp_batch",
   "process.type": "batch",
-  "timestamp": 123456,
   "data.sources": [
     {
       "name": "source",
       "connector": {
-        "type": "avro",
-        "version": "1.7",
+        "type": "file",
         "config": {
-          "file.name": "src/test/resources/users_info_src.avro"
+          "format": "avro",
+          "paths": [
+            "src/test/resources/users_info_src.avro"
+          ]
         }
       }
     }
   ],
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "completeness",
-        "out.dataframe.name": "comp",
-        "rule": "email, post_code, first_name",
-        "out": [
-          {
-            "type": "metric",
-            "name": "comp"
-          }
-        ]
-      }
-    ]
-  },
+  "measures": [
+    {
+      "name": "completeness_measure",
+      "type": "completeness",
+      "data.source": "source",
+      "config": {
+        "expr": "email is null or post_code is null or first_name is null"
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "comp_metric",
+          "flatten": "map"
+        }
+      ]
+    }
+  ],
   "sinks": [
-    "CONSOLESINK"
+    "CONSOLESINK",
+    "customSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json
index 39db17c..2195795 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -4,53 +4,37 @@
   "data.sources": [
     {
       "name": "source",
-      "baseline": true,
       "connector": {
-        "type": "avro",
-        "version": "1.7",
+        "type": "file",
         "config": {
-          "file.name": "src/test/resources/users_info_src.avro"
-        }
-      }
-    },
-    {
-      "name": "target",
-      "baseline": true,
-      "connector": {
-        "type": "avro",
-        "version": "1.7",
-        "config": {
-          "file.name": "src/test/resources/users_info_src.avro"
+          "format": "avro",
+          "paths": [
+            "src/test/resources/users_info_src.avro"
+          ]
         }
       }
     }
   ],
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "out.dataframe.name": "dist",
-        "rule": "user_id",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "num": "num",
-          "duplication.array": "dup"
-        },
-        "out": [
-          {
-            "type": "metric",
-            "name": "distinct"
-          }
-        ]
-      }
-    ]
-  },
+  "measures": [
+    {
+      "name": "duplication_measure",
+      "type": "duplication",
+      "data.source": "source",
+      "config": {
+        "expr": "user_id",
+        "bad.record.definition": "distinct"
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "duplication_metric",
+          "flatten": "map"
+        }
+      ]
+    }
+  ],
   "sinks": [
-    "CONSOLESink"
+    "CONSOLESink",
+    "customSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json
index aa4d749..c064594 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -1,54 +1,44 @@
 {
   "name": "prof_batch",
   "process.type": "batch",
-  "timestamp": 123456,
   "data.sources": [
     {
       "name": "source",
       "connector": {
-        "type": "avro",
-        "version": "1.7",
-        "dataframe.name": "this_table",
+        "type": "file",
         "config": {
-          "file.name": "src/test/resources/users_info_src.avro"
+          "format": "avro",
+          "paths": [
+            "src/test/resources/users_info_src.avro"
+          ]
         },
         "pre.proc": [
-          "select * from this_table where user_id < 10014"
+          "select * from this where user_id < 10014"
         ]
       }
     }
   ],
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "out.dataframe.name": "prof",
-        "rule": "user_id, count(*) as cnt from source group by user_id",
-        "out": [
-          {
-            "type": "metric",
-            "name": "prof",
-            "flatten": "array"
-          }
-        ]
+  "measures": [
+    {
+      "name": "profiling_measure",
+      "type": "profiling",
+      "data.source": "source",
+      "config": {
+        "expr": "first_name, user_id",
+        "approx.distinct.count": true,
+        "round.scale": 2
       },
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "out.dataframe.name": "grp",
-        "rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc",
-        "out": [
-          {
-            "type": "metric",
-            "name": "post_group",
-            "flatten": "array"
-          }
-        ]
-      }
-    ]
-  },
+      "out": [
+        {
+          "type": "metric",
+          "name": "prof_metric",
+          "flatten": "map"
+        }
+      ]
+    }
+  ],
   "sinks": [
-    "CONSOLESink"
+    "CONSOLESink",
+    "customSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_sparksql-batch-griffindsl.json b/measure/src/test/resources/_sparksql-batch-griffindsl.json
new file mode 100644
index 0000000..1cf98a4
--- /dev/null
+++ b/measure/src/test/resources/_sparksql-batch-griffindsl.json
@@ -0,0 +1,59 @@
+{
+  "name": "prof_batch",
+  "process.type": "batch",
+  "data.sources": [
+    {
+      "name": "source",
+      "connector": {
+        "type": "file",
+        "config": {
+          "format": "avro",
+          "paths": [
+            "src/test/resources/users_info_src.avro"
+          ]
+        },
+        "pre.proc": [
+          "select * from this where user_id < 10014"
+        ]
+      }
+    }
+  ],
+  "measures": [
+    {
+      "name": "query_measure1",
+      "type": "sparkSQL",
+      "data.source": "source",
+      "config": {
+        "expr": "select user_id, count(*) as cnt from source group by user_id",
+        "bad.record.definition": "cnt > 1"
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "sql_metric",
+          "flatten": "map"
+        }
+      ]
+    },
+    {
+      "name": "query_measure2",
+      "type": "sparkSQL",
+      "data.source": "source",
+      "config": {
+        "expr": "select post_code, count(*) as cnt from source group by post_code order by cnt desc",
+        "bad.record.definition": "cnt > 1"
+      },
+      "out": [
+        {
+          "type": "metric",
+          "name": "sql_metric",
+          "flatten": "map"
+        }
+      ]
+    }
+  ],
+  "sinks": [
+    "CONSOLESink",
+    "customSink"
+  ]
+}
\ No newline at end of file
diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json
index de347c7..f2c5ee7 100644
--- a/measure/src/test/resources/env-batch.json
+++ b/measure/src/test/resources/env-batch.json
@@ -12,6 +12,13 @@
       "config": {
         "max.log.lines": 10
       }
+    },
+    {
+      "name": "customSink",
+      "type": "custom",
+      "config": {
+        "class": "org.apache.griffin.measure.sink.CustomSink"
+      }
     }
   ],
   "griffin.checkpoint": []
diff --git a/measure/src/test/resources/log4j.properties b/measure/src/test/resources/log4j.properties
index 3b408db..bc73999 100644
--- a/measure/src/test/resources/log4j.properties
+++ b/measure/src/test/resources/log4j.properties
@@ -16,15 +16,12 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-
 log4j.rootLogger=INFO, stdout
+log4j.logger.org.apache=ERROR
+log4j.logger.DataNucleus=ERROR
+log4j.logger.org.spark_project=ERROR
+log4j.logger.org.apache.griffin=INFO
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
-log4j.logger.org.apache=WARN
-log4j.logger.org.spark_project=WARN
-
-# for travis test log
-log4j.logger.org.apache.hadoop.hive.metastore=INFO
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
index 011122f..66180b1 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -22,10 +22,11 @@
 import org.apache.commons.io.FileUtils
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
 import org.scalatest._
 import org.scalatest.flatspec.AnyFlatSpec
 
-trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll {
+trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll with BeforeAndAfterEach {
 
   @transient var spark: SparkSession = _
   @transient var sc: SparkContext = _
@@ -38,6 +39,9 @@
     spark = SparkSession.builder
       .master("local[4]")
       .appName("Griffin Job Suite")
+      .config(SQLConf.SHUFFLE_PARTITIONS.key, "4")
+      .config("spark.default.parallelism", "4")
+      .config("spark.sql.crossJoin.enabled", "true")
       .config(conf)
       .enableHiveSupport()
       .getOrCreate()
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index f624e99..d5b5eb2 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -17,28 +17,21 @@
 
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
-import org.scalatest._
 import scala.util.{Failure, Success}
 
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
 import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
-import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-import org.scalatest._
-import flatspec.AnyFlatSpec
-import matchers.should._
+import org.apache.griffin.measure.configuration.enums.MeasureTypes
+
 class ParamFileReaderSpec extends AnyFlatSpec with Matchers {
 
   "params " should "be parsed from a valid file" in {
     val reader: ParamReader =
       ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
     val params = reader.readConfig[DQConfig]
-    params match {
-      case Success(v) =>
-        v.getEvaluateRule.getRules.head.getDslType should ===(GriffinDsl)
-        v.getEvaluateRule.getRules.head.getOutDfName() should ===("accu")
-      case Failure(e) =>
-        fail("it should not happen", e)
-    }
-
+    assert(params.isSuccess)
   }
 
   it should "fail for an invalid file" in {
@@ -69,16 +62,10 @@
   }
 
   it should "be parsed from a valid errorconf completeness json file" in {
-    val reader: ParamReader = ParamFileReader(
-      getClass.getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
+    val reader: ParamReader =
+      ParamFileReader(getClass.getResource("/_completeness-batch-griffindsl.json").getFile)
     val params = reader.readConfig[DQConfig]
-    params match {
-      case Success(v) =>
-        v.getEvaluateRule.getRules.head.getErrorConfs.length should ===(2)
-        v.getEvaluateRule.getRules.head.getErrorConfs.head.getColumnName.get should ===("user")
-        v.getEvaluateRule.getRules.head.getErrorConfs(1).getColumnName.get should ===("name")
-      case Failure(e) =>
-        fail("it should not happen", e)
-    }
+    assert(params.isSuccess)
+    assert(params.get.getMeasures.forall(_.getType == MeasureTypes.Completeness))
   }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index d8bf125..db95c6a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -24,7 +24,6 @@
 import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
-import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
 
 class ParamJsonReaderSpec extends AnyFlatSpec with Matchers {
 
@@ -36,14 +35,8 @@
 
     val reader: ParamReader = ParamJsonReader(jsonString)
     val params = reader.readConfig[DQConfig]
-    params match {
-      case Success(v) =>
-        v.getEvaluateRule.getRules.head.getDslType should ===(GriffinDsl)
-        v.getEvaluateRule.getRules.head.getOutDfName() should ===("accu")
-      case Failure(_) =>
-        fail("it should not happen")
-    }
 
+    assert(params.isSuccess)
   }
 
   it should "fail for an invalid file" in {
@@ -58,7 +51,7 @@
       case Success(_) =>
         fail("it is an invalid config file")
       case Failure(e) =>
-        e.getMessage should include("evaluate.rule should not be null")
+        e.getMessage should include("Connector is undefined or invalid")
     }
 
   }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
index fbbfc1b..e18efaf 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/AccuracyMeasureTest.scala
@@ -128,11 +128,11 @@
     val measure = AccuracyMeasure(param)
     val (recordsDf, metricsDf) = measure.execute(context, None)
 
-    assertResult(recordsDf.schema)(recordDfSchema)
-    assertResult(metricsDf.schema)(metricDfSchema)
+    assertResult(recordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
 
-    assertResult(recordsDf.count())(source.count())
-    assertResult(metricsDf.count())(1L)
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
index 492bd1b..618b96e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/CompletenessMeasureTest.scala
@@ -64,11 +64,11 @@
     val measure = CompletenessMeasure(param)
     val (recordsDf, metricsDf) = measure.execute(context, None)
 
-    assertResult(recordsDf.schema)(recordDfSchema)
-    assertResult(metricsDf.schema)(metricDfSchema)
+    assertResult(recordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
 
-    assertResult(recordsDf.count())(source.count())
-    assertResult(metricsDf.count())(1L)
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
@@ -86,11 +86,11 @@
       param.copy(config = Map(Expression -> "name is null or gender is null")))
     val (recordsDf, metricsDf) = measure.execute(context, None)
 
-    assertResult(recordsDf.schema)(recordDfSchema)
-    assertResult(metricsDf.schema)(metricDfSchema)
+    assertResult(recordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
 
-    assertResult(recordsDf.count())(source.count())
-    assertResult(metricsDf.count())(1L)
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
index b3c2c4f..d50b202 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/DuplicationMeasureTest.scala
@@ -104,11 +104,11 @@
     val measure = DuplicationMeasure(param.copy(config = Map(BadRecordDefinition -> "duplicate")))
     val (recordsDf, metricsDf) = measure.execute(context, None)
 
-    assertResult(recordsDf.schema)(recordDfSchema)
-    assertResult(metricsDf.schema)(metricDfSchema)
+    assertResult(recordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
 
-    assertResult(recordsDf.count())(source.count())
-    assertResult(metricsDf.count())(1L)
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
@@ -127,11 +127,11 @@
       param.copy(config = Map(Expression -> "name", BadRecordDefinition -> "duplicate")))
     val (recordsDf, metricsDf) = measure.execute(context, None)
 
-    assertResult(recordsDf.schema)(recordDfSchema)
-    assertResult(metricsDf.schema)(metricDfSchema)
+    assertResult(recordDfSchema)(recordsDf.schema)
+    assertResult(metricDfSchema)(metricsDf.schema)
 
-    assertResult(recordsDf.count())(source.count())
-    assertResult(metricsDf.count())(1L)
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
index 0fc711a..15489a3 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/ProfilingMeasureTest.scala
@@ -70,7 +70,7 @@
 
     val (_, metricsDf) = measure.execute(context, None)
 
-    assertResult(metricsDf.count())(1L)
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
@@ -94,7 +94,7 @@
 
     val (_, metricsDf) = measure.execute(context, None)
 
-    assertResult(metricsDf.count())(1L)
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
index 31f73e7..0a52d9c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/execution/impl/SparkSqlMeasureTest.scala
@@ -98,10 +98,9 @@
     val measure = SparkSQLMeasure(param)
     val (recordsDf, metricsDf) = measure.execute(context, None)
 
-    assertResult(metricsDf.schema)(metricDfSchema)
-
-    assertResult(recordsDf.count())(source.count())
-    assertResult(metricsDf.count())(1L)
+    assertResult(metricDfSchema)(metricsDf.schema)
+    assertResult(source.count())(recordsDf.count())
+    assertResult(1L)(metricsDf.count())
 
     val row = metricsDf.head()
     assertResult(param.getDataSource)(row.getAs[String](DataSource))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
index a95d76e..868b0ed 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/BatchDQAppTest.scala
@@ -17,13 +17,13 @@
 
 package org.apache.griffin.measure.job
 
-import org.apache.spark.sql.AnalysisException
 import scala.reflect.ClassTag
 import scala.util.{Failure, Success, Try}
 
+import org.apache.griffin.measure.execution.Measure._
 import org.apache.griffin.measure.Application.readParamFile
 import org.apache.griffin.measure.configuration.dqdefinition.EnvConfig
-import org.apache.griffin.measure.launch.batch.BatchDQApp
+import org.apache.griffin.measure.sink.CustomSinkResultRegister
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 
 class BatchDQAppTest extends DQAppTest {
@@ -42,8 +42,6 @@
 
     Try {
       sparkParam.getConfig.foreach { case (k, v) => spark.conf.set(k, v) }
-      spark.conf.set("spark.app.name", "BatchDQApp Test")
-      spark.conf.set("spark.sql.crossJoin.enabled", "true")
 
       val logLevel = getGriffinLogLevel
       sc.setLogLevel(sparkParam.getLogLevel)
@@ -54,21 +52,41 @@
     }
   }
 
-  def runAndCheckResult(metrics: Map[String, Any]): Unit = {
-    dqApp.run match {
-      case Success(ret) => assert(ret)
-      case Failure(ex) =>
-        error(s"process run error: ${ex.getMessage}", ex)
-        throw ex
-    }
+  override def beforeEach(): Unit = {
+    super.beforeEach()
 
+    dqApp = null
+    CustomSinkResultRegister.clear()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+
+    dqApp = null
+    CustomSinkResultRegister.clear()
+  }
+
+  def runAndCheckResult(expectedMetrics: Map[String, Map[String, Any]]): Unit = {
     // check Result Metrics
-    val dqContext = dqApp.asInstanceOf[BatchDQApp].dqContext
-    val timestamp = dqContext.contextId.timestamp
-    val expectedMetrics =
-      Map(timestamp -> metrics)
 
-    dqContext.metricWrapper.metrics should equal(expectedMetrics)
+    val measureNames = dqApp.dqParam.getMeasures
+    assert(measureNames.nonEmpty)
+
+    measureNames.foreach(param => {
+      val actualMetricsOpt = CustomSinkResultRegister.getMetrics(param.getName)
+      assert(actualMetricsOpt.isDefined)
+
+      val actualMetricsMap = actualMetricsOpt.get
+
+      assertResult(param.getName)(actualMetricsMap.get(MeasureName).orNull)
+      assertResult(param.getType.toString)(actualMetricsMap.get(MeasureType).orNull)
+      assertResult(param.getDataSource)(actualMetricsMap.get(DataSource).orNull)
+
+      val actualMetrics = actualMetricsMap.getOrElse(Metrics, null).asInstanceOf[Map[String, Any]]
+
+      assert(expectedMetrics.contains(param.getName))
+      actualMetrics should contain theSameElementsAs expectedMetrics(param.getName)
+    })
   }
 
   def runAndCheckException[T <: AnyRef](implicit classTag: ClassTag[T]): Unit = {
@@ -81,79 +99,80 @@
   }
 
   "accuracy batch job" should "work" in {
-    dqApp = initApp("/_accuracy-batch-griffindsl.json")
-    val expectedMetrics = Map(
-      "total_count" -> 50,
-      "miss_count" -> 4,
-      "matched_count" -> 46,
-      "matchedFraction" -> 0.92)
+    dqApp = runApp("/_accuracy-batch-griffindsl.json")
+    val expectedMetrics = Map("total" -> "50", "accurate" -> "45", "inaccurate" -> "5")
 
-    runAndCheckResult(expectedMetrics)
+    runAndCheckResult(Map("accuracy_measure" -> expectedMetrics))
   }
 
   "completeness batch job" should "work" in {
-    dqApp = initApp("/_completeness-batch-griffindsl.json")
-    val expectedMetrics = Map("total" -> 50, "incomplete" -> 1, "complete" -> 49)
+    dqApp = runApp("/_completeness-batch-griffindsl.json")
+    val expectedMetrics = Map("total" -> "50", "incomplete" -> "1", "complete" -> "49")
 
-    runAndCheckResult(expectedMetrics)
+    runAndCheckResult(Map("completeness_measure" -> expectedMetrics))
   }
 
-  "distinctness batch job" should "work" in {
-    dqApp = initApp("/_distinctness-batch-griffindsl.json")
+  "duplication batch job" should "work" in {
+    dqApp = runApp("/_distinctness-batch-griffindsl.json")
+    val expectedMetrics =
+      Map("duplicate" -> "1", "unique" -> "48", "non_unique" -> "1", "distinct" -> "49")
+
+    runAndCheckResult(Map("duplication_measure" -> expectedMetrics))
+  }
+
+  "spark sql batch job" should "work" in {
+    dqApp = runApp("/_sparksql-batch-griffindsl.json")
 
     val expectedMetrics =
-      Map("total" -> 50, "distinct" -> 49, "dup" -> Seq(Map("dup" -> 1, "num" -> 1)))
+      Map(
+        "query_measure1" -> Map("total" -> "13", "complete" -> "13", "incomplete" -> "0"),
+        "query_measure2" -> Map("total" -> "1", "complete" -> "0", "incomplete" -> "1"))
 
     runAndCheckResult(expectedMetrics)
   }
 
   "profiling batch job" should "work" in {
-    dqApp = initApp("/_profiling-batch-griffindsl.json")
+    dqApp = runApp("/_profiling-batch-griffindsl.json")
+
     val expectedMetrics = Map(
-      "prof" -> Seq(
-        Map("user_id" -> 10004, "cnt" -> 1),
-        Map("user_id" -> 10011, "cnt" -> 1),
-        Map("user_id" -> 10010, "cnt" -> 1),
-        Map("user_id" -> 10002, "cnt" -> 1),
-        Map("user_id" -> 10006, "cnt" -> 1),
-        Map("user_id" -> 10001, "cnt" -> 1),
-        Map("user_id" -> 10005, "cnt" -> 1),
-        Map("user_id" -> 10008, "cnt" -> 1),
-        Map("user_id" -> 10013, "cnt" -> 1),
-        Map("user_id" -> 10003, "cnt" -> 1),
-        Map("user_id" -> 10007, "cnt" -> 1),
-        Map("user_id" -> 10012, "cnt" -> 1),
-        Map("user_id" -> 10009, "cnt" -> 1)),
-      "post_group" -> Seq(Map("post_code" -> "94022", "cnt" -> 13)))
+      "column_details" -> Map(
+        "user_id" -> Map(
+          "avg_col_len" -> "5.0",
+          "max_col_len" -> "5",
+          "variance" -> "15.17",
+          "kurtosis" -> "-1.21",
+          "avg" -> "10007.0",
+          "min" -> "10001",
+          "null_count" -> "0",
+          "approx_distinct_count" -> "13",
+          "total" -> "13",
+          "std_dev" -> "3.89",
+          "data_type" -> "bigint",
+          "max" -> "10013",
+          "min_col_len" -> "5"),
+        "first_name" -> Map(
+          "avg_col_len" -> null,
+          "max_col_len" -> "6",
+          "variance" -> null,
+          "kurtosis" -> null,
+          "avg" -> null,
+          "min" -> null,
+          "null_count" -> "0",
+          "approx_distinct_count" -> "13",
+          "total" -> "13",
+          "std_dev" -> null,
+          "data_type" -> "string",
+          "max" -> null,
+          "min_col_len" -> "6")))
 
-    runAndCheckResult(expectedMetrics)
-  }
-
-  "timeliness batch job" should "work" in {
-    dqApp = initApp("/_timeliness-batch-griffindsl.json")
-    val expectedMetrics = Map(
-      "total" -> 10,
-      "avg" -> 276000,
-      "percentile_95" -> 660000,
-      "step" -> Seq(
-        Map("step" -> 0, "cnt" -> 6),
-        Map("step" -> 5, "cnt" -> 2),
-        Map("step" -> 3, "cnt" -> 1),
-        Map("step" -> 4, "cnt" -> 1)))
-
-    runAndCheckResult(expectedMetrics)
-  }
-
-  "uniqueness batch job" should "work" in {
-    dqApp = initApp("/_uniqueness-batch-griffindsl.json")
-    val expectedMetrics = Map("total" -> 50, "unique" -> 48)
-
-    runAndCheckResult(expectedMetrics)
+    runAndCheckResult(Map("profiling_measure" -> expectedMetrics))
   }
 
   "batch job" should "fail with exception caught due to invalid rules" in {
-    dqApp = initApp("/_profiling-batch-griffindsl_malformed.json")
+    assertThrows[java.lang.AssertionError] {
+      runApp("/_profiling-batch-griffindsl_malformed.json")
+    }
 
-    runAndCheckException[AnalysisException]
+    assertThrows[NullPointerException](runAndCheckException)
   }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index 9fc9883..e24bcd5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -46,12 +46,11 @@
     }
   }
 
-  def initApp(dqParamFile: String): DQApp = {
+  def runApp(dqParamFile: String): DQApp = {
     val dqParam = readParamFile[DQConfig](getConfigFilePath(dqParamFile)) match {
       case Success(p) => p
       case Failure(ex) =>
-        error(ex.getMessage, ex)
-        sys.exit(-2)
+        throw ex
     }
 
     val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
@@ -67,6 +66,14 @@
     }
 
     dqApp.sparkSession = spark
+
+    dqApp.run match {
+      case Success(ret) => assert(ret)
+      case Failure(ex) =>
+        error(s"process run error: ${ex.getMessage}", ex)
+        throw ex
+    }
+
     dqApp
   }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index 3d0aa0e..93aa0ff 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -24,12 +24,12 @@
 import org.apache.spark.sql.DataFrame
 
 /**
- * sink records and metrics in memory for test.
+ * A dummy batch sink for testing.
  *
  * @param config sink configurations
- * @param jobName
- * @param timeStamp
- * @param block
+ * @param jobName Griffin Job Name
+ * @param timeStamp timestamp for job
+ * @param block is blocking or not
  */
 case class CustomSink(config: Map[String, Any], jobName: String, timeStamp: Long, block: Boolean)
     extends Sink {
@@ -50,10 +50,46 @@
   val allMetrics: mutable.Map[String, Any] = mutable.Map[String, Any]()
 
   override def sinkMetrics(metrics: Map[String, Any]): Unit = {
+    val (metricName: String, value: Map[String, Any]) = metrics("value")
+      .asInstanceOf[Map[String, Any]]
+      .head
+
+    CustomSinkResultRegister.setMetrics(metricName, value)
+
     allMetrics ++= metrics
   }
 
   override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
+    CustomSinkResultRegister.setBatch(key.get, dataset.toJSON.collect())
     allRecords ++= dataset.toJSON.rdd.collect()
   }
 }
+
+/**
+ * Register for storing test sink results in memory
+ */
+object CustomSinkResultRegister {
+
+  private val _metricsSink: mutable.Map[String, Map[String, Any]] = mutable.HashMap.empty
+  private val _batchSink: mutable.Map[String, Array[String]] = mutable.HashMap.empty
+
+  def setMetrics(key: String, metrics: Map[String, Any]): Unit = {
+    val updatedMetrics = _metricsSink.getOrElse(key, Map.empty) ++ metrics
+    _metricsSink.put(key, updatedMetrics)
+  }
+
+  def getMetrics(key: String): Option[Map[String, Any]] = _metricsSink.get(key)
+
+  def setBatch(key: String, batch: Array[String]): Unit = {
+    val updatedBatch = _batchSink.getOrElse(key, Array.empty) ++ batch
+    _batchSink.put(key, updatedBatch)
+  }
+
+  def getBatch(key: String): Option[Array[String]] = _batchSink.get(key)
+
+  def clear(): Unit = {
+    _metricsSink.clear()
+    _batchSink.clear()
+  }
+
+}
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index 8bf81b1..55e1e9c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -19,8 +19,10 @@
 
 import scala.collection.mutable
 
+import org.apache.spark.sql.functions._
+
 import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, SinkParam}
-import org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.FlattenType.MapFlattenType
 import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep, RecordWriteStep}
 
 class CustomSinkTest extends SinkTestBase {
@@ -40,29 +42,30 @@
   }
 
   "custom sink" can "sink metrics" in {
-    val actualMetrics = withCustomSink(sinks => {
+    val measureName = "test_measure"
+    withCustomSink(sinks => {
       sinks.foreach { sink =>
         try {
-          sink.sinkMetrics(Map("sum" -> 10))
+          sink.sinkMetrics(Map("value" -> Map(measureName -> Map("sum" -> 10))))
         } catch {
           case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
         }
       }
+
       sinks.foreach { sink =>
         try {
-          sink.sinkMetrics(Map("count" -> 5))
+          sink.sinkMetrics(Map("value" -> Map(measureName -> Map("count" -> 5))))
         } catch {
           case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
         }
       }
-      sinks.headOption match {
-        case Some(sink: CustomSink) => sink.allMetrics
-        case _ => Map.empty
-      }
     })
 
+    val actualMetricsOpt = CustomSinkResultRegister.getMetrics(measureName)
+    assert(actualMetricsOpt.isDefined)
+
     val expected = Map("sum" -> 10, "count" -> 5)
-    actualMetrics should be(expected)
+    actualMetricsOpt.get should contain theSameElementsAs expected
   }
 
   "custom sink" can "sink records" in {
@@ -134,33 +137,44 @@
   "MetricWriteStep" should "output default metrics with custom sink" in {
     val resultTable = "result_table"
     val df = createDataFrame(1 to 5)
-    df.groupBy("sex")
-      .agg("age" -> "max", "age" -> "avg")
-      .createOrReplaceTempView(resultTable)
+    val metricCols = Seq("sex", "max_age", "avg_age").flatMap(c => Seq(lit(c), col(c)))
+
+    val metricDf = df
+      .groupBy("sex")
+      .agg(max("age").as("max_age"), avg("age").as("avg_age"))
+      .select(map(metricCols: _*).as("metrics"))
+      .withColumn("mark", lit(1))
+      .groupBy("mark")
+      .agg(collect_list("metrics") as "metrics")
+      .select("metrics")
+
+    metricDf.createOrReplaceTempView(resultTable)
 
     val dQContext = getDqContext()
 
     val metricWriteStep = {
-      val metricOpt = Some(metricsDefaultOutput)
+      val metricOpt = Some(metricsMapOutput)
       val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
-      val flattenType = metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
+      val flattenType = metricOpt.map(_.getFlatten).getOrElse(MapFlattenType)
+
       MetricWriteStep(mwName, resultTable, flattenType)
     }
 
     metricWriteStep.execute(dQContext)
     MetricFlushStep().execute(dQContext)
-    val actualMetrics = dQContext.getSinks.headOption match {
-      case Some(sink: CustomSink) => sink.allMetrics
-      case _ => mutable.Map[String, Any]()
-    }
 
-    val metricsValue = Seq(
-      Map("sex" -> "man", "max(age)" -> 19, "avg(age)" -> 18.0),
-      Map("sex" -> "women", "max(age)" -> 20, "avg(age)" -> 18.0))
+    val expectedMetrics = Array(
+      Map("sex" -> "women", "max_age" -> "20", "avg_age" -> "18.0"),
+      Map("sex" -> "man", "max_age" -> "19", "avg_age" -> "18.0"))
 
-    val expected = Map("default_output" -> metricsValue)
+    val actualMetricsOpt = CustomSinkResultRegister.getMetrics(metricWriteStep.name)
+    assert(actualMetricsOpt.isDefined)
 
-    actualMetrics("value") should be(expected)
+    val actualMetricsMap: Map[String, Any] = actualMetricsOpt.get
+    assert(actualMetricsMap.contains("metrics"))
+
+    val actualMetrics = actualMetricsMap("metrics").asInstanceOf[Seq[Map[String, String]]]
+    actualMetrics should contain theSameElementsAs expectedMetrics
   }
 
 }