[GRIFFIN-347] Updated with master
diff --git a/.gitignore b/.gitignore
index 93610da..8d3873e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,7 @@
 *.iml
 .idea/
 .DS_Store
+.java-version
 target/**
 
 # Mobile Tools for Java (J2ME)
@@ -11,6 +12,7 @@
 *.jar
 *.war
 *.ear
+*.cfg
 target
 service/src/main/resources/public/
 ui/.tmp/
@@ -20,7 +22,7 @@
 .settings/
 .classpath
 bin
-
+**/site-packages/**
 # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
 # don't need binary 
diff --git a/griffin-doc/deploy/measure-build-guide.md b/griffin-doc/deploy/measure-build-guide.md
new file mode 100644
index 0000000..8914bc6
--- /dev/null
+++ b/griffin-doc/deploy/measure-build-guide.md
@@ -0,0 +1,74 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Griffin Build Guide - Measure Module
+
+Like other modules of Apache Griffin, `measure` module is also built using Maven build tool. Building `measure` module
+requires Maven version 3.5+ and Java 8.
+
+## Version Compatibility
+
+Starting from Apache Griffin 0.7, the `measure` module will be (scala-spark) cross version compatible. Since both Scala
+and Spark are dependencies for Apache Griffin, details of Spark-Scala cross version compatibility is mentioned below,
+
+|           | Spark 2.3.x | Spark 2.4.x | Spark 3.0.x |
+| --------- |:-----------:|:-----------:|:-----------:|
+| Scala 2.11| ✓           | ✓           | x           |
+| Scala 2.12| x           | ✓           | ✓           |
+
+## Building a Distribution
+
+Execute the below commands to build `measure` with desired version of Spark and Scala,
+
+By default, the build is compiled with Scala 2.11 and Apache Spark 2.4.x.
+
+```
+# For measure module with Scala 2.11 and Spark 2.4.x
+mvn clean package
+```
+
+To change Scala or Spark version you can use the commands below,
+
+```
+# For measure module with Scala 2.12 and Spark 2.4.x
+mvn clean package -Dscala-2.12
+```
+
+```
+# For measure module with Scala 2.11 and Spark 2.3.x
+mvn clean package -Dspark-2.3
+```
+
+```
+# For measure module with Scala 2.12 and Spark 3.0.x
+mvn clean package -Dscala-2.12 -Dspark-3.0 
+```
+
+Note: 
+ - Using `-Dscala-2.12` and `-Dspark-2.3` option together will cause build failure due to missing dependencies as it
+is not cross compiled, see details [here](#version-compatibility)
+ - Using `-Dspark-3.0` option without `-Dscala-2.12` option will cause build failure due to missing dependencies as it
+   is not cross compiled, see details [here](#version-compatibility)
+
+### AVRO Source Support
+
+Starting Spark 2.4.x, AVRO source (`spark-avro` package) was migrated from `com.databricks` group to `org.apache.spark`.
+Additionally, the older dependency does not support scala 2.12.
+
+All builds of `measure` module will contain AVRO source support by default.
diff --git a/griffin-doc/measure/predicates.md b/griffin-doc/measure/predicates.md
index 87f1a0d..f9187b9 100644
--- a/griffin-doc/measure/predicates.md
+++ b/griffin-doc/measure/predicates.md
@@ -17,14 +17,13 @@
 under the License.
 -->
 
-#About predicates
+# About predicates
 
-##Overview
+## Overview
 The purpose of predicates is obligate Griffin to check certain conditions before starting SparkSubmitJob. 
 Depending on these conditions Griffin need to start or not start the measurement.
 
-##Configure predicates
-
+## Configure predicates
 For configuring predicates need add property to measure json:
 ```
 {
@@ -74,10 +73,10 @@
 - implement interface **org.apache.griffin.core.job.Predicator**
 - have constructor with argument of type **org.apache.griffin.core.job.entity.SegmentPredicate**
 
-##Deployment custom predicates
+## Deployment custom predicates
 For the creating custom predicate you need 
 1. Build the Griffin service using command
-As a result, two artifacts will be built  
+As a result, two artifacts will be built,  
 - **service-VERSION.jar** - executable Spring-Boot application
 - **service-VERSION-lib.jar** - jar, which we can use as a dependency
 This step is necessary because we can't use executable Spring-Boot application as a dependency in our plugin. 
diff --git a/measure/assembly.xml b/measure/assembly.xml
deleted file mode 100644
index 7890497..0000000
--- a/measure/assembly.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>package</id>
-  <formats>
-    <format>tar.gz</format>
-  </formats>
-  <fileSets>
-    <fileSet>
-      <directory>${project.basedir}/sbin</directory>
-      <outputDirectory>/bin</outputDirectory>
-      <includes>
-        <include>/**</include>
-      </includes>
-      <lineEnding>unix</lineEnding>
-      <fileMode>0777</fileMode>
-      <directoryMode>0755</directoryMode>
-    </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/src/main/resources</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>/**</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-
-  <dependencySets>
-    <dependencySet>
-      <useProjectArtifact>true</useProjectArtifact>
-      <outputDirectory>/lib</outputDirectory>
-      <unpack>false</unpack>
-      <scope>provided</scope>
-      <excludes>
-        <exclude>com.twitter:parquet-hadoop-bundle</exclude>
-        <exclude>io.dropwizard.metrics:*</exclude>
-        <exclude>org.glassfish.jersey.core:*</exclude>
-        <exclude>org.glassfish.jersey.containers:*</exclude>
-        <exclude>org.apache.thrift:*</exclude>
-        <exclude>org.apache.parquet:*</exclude>
-        <exclude>org.apache.hadoop:*</exclude>
-        <exclude>org.apache.spark:*</exclude>
-      </excludes>
-    </dependencySet>
-  </dependencySets>
-</assembly>
diff --git a/measure/pom.xml b/measure/pom.xml
index a1d8f8e..3cd8bc6 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -24,297 +24,314 @@
     <parent>
         <groupId>org.apache.griffin</groupId>
         <artifactId>griffin</artifactId>
-        <version>0.6.0-SNAPSHOT</version>
+        <version>0.7.0-SNAPSHOT</version>
     </parent>
 
     <artifactId>measure</artifactId>
     <packaging>jar</packaging>
 
-    <name>Apache Griffin :: Measures</name>
-    <url>http://maven.apache.org</url>
+    <name>Apache Griffin :: Measure</name>
+    <url>http://griffin.apache.org</url>
 
     <properties>
-        <scala.version>2.11.8</scala.version>
-        <spark.version>2.2.1</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <avro.version>1.7.7</avro.version>
-        <jackson.version>2.8.7</jackson.version>
+
+        <!-- Spark -->
+        <spark.major.version>2.4</spark.major.version>
+        <spark.version>${spark.major.version}.4</spark.version>
+        <spark.scope>${provided.scope}</spark.scope>
+        <spark-streaming-kafka.version>${spark.version}</spark-streaming-kafka.version>
+
+        <!-- Code Standardization -->
+        <scalastyle.version>1.0.0</scalastyle.version>
+        <scalafmt.version>1.0.3</scalafmt.version>
+
+        <!-- Data Connectors -->
+        <mysql.java.version>5.1.47</mysql.java.version>
+        <mariadb.version>2.7.0</mariadb.version>
+        <cassandra.connector.version>2.5.1</cassandra.connector.version>
+        <elasticsearch.version>6.4.1</elasticsearch.version>
         <scalaj.version>2.3.0</scalaj.version>
         <mongo.version>2.1.0</mongo.version>
-        <scalatest.version>3.0.0</scalatest.version>
-        <slf4j.version>1.7.21</slf4j.version>
-        <log4j.version>1.2.16</log4j.version>
         <curator.version>2.10.0</curator.version>
-        <mockito.version>1.10.19</mockito.version>
-        <mysql.java.version>5.1.47</mysql.java.version>
-        <cassandra.connector.version>2.4.1</cassandra.connector.version>
-        <scalastyle.version>1.0.0</scalastyle.version>
-        <scalafmt.parameters>--diff --test</scalafmt.parameters>
-        <scalafmt.skip>false</scalafmt.skip>
-        <elasticsearch.version>6.4.1</elasticsearch.version>
-        <spark.scope>provided</spark.scope>
+
+        <!-- Testing -->
+        <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
+        <mockito.version>3.2.2.0</mockito.version>
+        <scalatest.version>3.2.3</scalatest.version>
     </properties>
 
     <dependencies>
-        <!--scala-->
+
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>${scala.version}</version>
         </dependency>
 
-        <!--spark, spark streaming, spark hive-->
+        <!-- Spark Dependencies -->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>${spark.scope}</scope>
-        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>${spark.scope}</scope>
+        </dependency>
+
+        <!-- Data Connectors -->
+
+        <!-- Spark Hive-->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
+        </dependency>
+
+        <!-- Spark Kafka-->
+        <!-- TODO
+        Spark Streaming Kafka 08 is not cross compiled with scala 2.12.
+        Thus, this dependency has been temporary un-banned in the enforcer plugin.
+        This will be removed with structured streaming implementation
+        -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka-0-8_${scala211.binary.version}</artifactId>
+            <version>${spark-streaming-kafka.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>commons-httpclient</groupId>
-                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>org.scala-lang.modules</groupId>
+                    <artifactId>scala-xml_2.11</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpclient</artifactId>
+                    <groupId>org.scala-lang.modules</groupId>
+                    <artifactId>scala-parser-combinators_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-tags_2.11</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
 
-        <!--jackson-->
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <version>${jackson.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-            <version>${jackson.version}</version>
-        </dependency>
-
-        <!--scalaj for http request-->
-        <dependency>
-            <groupId>org.scalaj</groupId>
-            <artifactId>scalaj-http_${scala.binary.version}</artifactId>
-            <version>${scalaj.version}</version>
-        </dependency>
-
-        <!--mongo request-->
+        <!-- MongoDB-->
         <dependency>
             <groupId>org.mongodb.scala</groupId>
-            <artifactId>mongo-scala-driver_2.11</artifactId>
+            <artifactId>mongo-scala-driver_${scala.binary.version}</artifactId>
             <version>${mongo.version}</version>
         </dependency>
 
-        <!--avro-->
+        <!-- MySql -->
         <dependency>
-            <groupId>com.databricks</groupId>
-            <artifactId>spark-avro_${scala.binary.version}</artifactId>
-            <version>4.0.0</version>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.java.version}</version>
         </dependency>
 
-        <!--log4j-->
+        <!-- MariaDB -->
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>${slf4j.version}</version>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+            <version>${mariadb.version}</version>
         </dependency>
 
+        <!-- Spark Cassandra-->
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>${log4j.version}</version>
+            <groupId>com.datastax.spark</groupId>
+            <artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
+            <version>${cassandra.connector.version}</version>
         </dependency>
 
-        <!--curator-->
+        <!-- Spark Elasticsearch-->
+        <!-- TODO
+        Elasticsearch Spark has recently added support for scala 2.12 but it is not published in the Maven Central yet.
+        Thus, this dependency has been temporary un-banned in the enforcer plugin.
+        See https://github.com/elastic/elasticsearch-hadoop/pull/1589#issuecomment-776920189
+        -->
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch-spark-20_${scala211.binary.version}</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <!-- Miscellaneous-->
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
             <version>${curator.version}</version>
         </dependency>
 
-        <!--junit-->
+        <dependency>
+            <groupId>org.scalaj</groupId>
+            <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+            <version>${scalaj.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.9</version>
+        </dependency>
+
+        <!-- Test -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
 
-        <!--scala test-->
         <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_${scala.binary.version}</artifactId>
             <version>${scalatest.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <version>${mockito.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <groupId>org.scalatestplus</groupId>
+            <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-            <version>${mysql.java.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.datastax.spark</groupId>
-            <artifactId>spark-cassandra-connector_2.11</artifactId>
-            <version>${cassandra.connector.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>4.5.9</version>
-        </dependency>
-        <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
             <version>1.4.200</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch-spark-20_2.11</artifactId>
-            <version>${elasticsearch.version}</version>
-        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>1.14.3</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 
     <build>
         <sourceDirectory>src/main/scala</sourceDirectory>
         <testSourceDirectory>src/test/scala</testSourceDirectory>
         <plugins>
+
             <plugin>
                 <groupId>net.alchim31.maven</groupId>
                 <artifactId>scala-maven-plugin</artifactId>
-                <version>3.3.1</version>
+                <version>4.3.0</version>
                 <executions>
                     <execution>
-                        <id>compile</id>
                         <goals>
                             <goal>compile</goal>
                             <goal>testCompile</goal>
                         </goals>
-                        <phase>compile</phase>
                     </execution>
                 </executions>
                 <configuration>
                     <scalaVersion>${scala.version}</scalaVersion>
+                    <checkMultipleScalaVersions>true</checkMultipleScalaVersions>
+                    <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
+                    <recompileMode>incremental</recompileMode>
                     <args>
+                        <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
                         <arg>-feature</arg>
-                        <arg>-unchecked</arg>
+                        <arg>-explaintypes</arg>
                     </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms64m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-try</javacArg>
+                    </javacArgs>
                 </configuration>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-            </plugin>
+
             <plugin>
                 <groupId>org.scalatest</groupId>
                 <artifactId>scalatest-maven-plugin</artifactId>
+                <version>2.0.2</version>
+                <configuration>
+                    <testFailureIgnore>false</testFailureIgnore>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>TestSuiteReport.txt</filereports>
+                    <stderr/>
+                    <systemProperties>
+                        <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+                    </systemProperties>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
             </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
-                <version>3.1.0</version>
-                <configuration>
-                    <createDependencyReducedPom>false</createDependencyReducedPom>
-                </configuration>
+                <version>3.2.4</version>
                 <executions>
                     <execution>
                         <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
                         </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer
-                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>org.apache.griffin.measure.Application</mainClass>
-                                </transformer>
-                            </transformers>
-                            <relocations>
-                                <relocation>
-                                    <pattern>org.apache.http</pattern>
-                                    <shadedPattern>griffin.org.apache.http</shadedPattern>
-                                </relocation>
-                            </relocations>
-                            <filters>
-                                <filter>
-                                    <artifact>*:*</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/maven/**</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
-                        </configuration>
                     </execution>
                 </executions>
+                <configuration>
+                    <createDependencyReducedPom>false</createDependencyReducedPom>
+                    <minimizeJar>true</minimizeJar>
+                    <relocations>
+                        <relocation>
+                            <pattern>org.apache.http</pattern>
+                            <shadedPattern>griffin.org.apache.http</shadedPattern>
+                        </relocation>
+                    </relocations>
+                </configuration>
             </plugin>
+
             <plugin>
                 <groupId>org.antipathy</groupId>
                 <artifactId>mvn-scalafmt_${scala.binary.version}</artifactId>
-                <version>0.12_1.5.1</version>
+                <version>${scalafmt.version}</version>
                 <configuration>
-                    <parameters>${scalafmt.parameters}</parameters>
-                    <skip>${scalafmt.skip}</skip>
-                    <skipSources>${scalafmt.skip}</skipSources>
-                    <skipTestSources>${scalafmt.skip}</skipTestSources>
                     <configLocation>${project.parent.basedir}/.scalafmt.conf</configLocation>
+                    <skipTestSources>false</skipTestSources>
+                    <skipSources>false</skipSources>
+                    <respectVersion>false</respectVersion>
+                    <validateOnly>false</validateOnly>
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>validate</phase>
+                        <phase>compile</phase>
                         <goals>
                             <goal>format</goal>
                         </goals>
                     </execution>
                 </executions>
             </plugin>
+
             <plugin>
                 <groupId>org.scalastyle</groupId>
                 <artifactId>scalastyle-maven-plugin</artifactId>
@@ -333,21 +350,81 @@
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>validate</phase>
+                        <phase>compile</phase>
                         <goals>
                             <goal>check</goal>
                         </goals>
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+                <version>0.8.2</version>
+                <executions>
+                    <execution>
+                        <id>default-prepare-agent</id>
+                        <goals>
+                            <goal>prepare-agent</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>report</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>report</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scoverage</groupId>
+                <artifactId>scoverage-maven-plugin</artifactId>
+                <version>${scoverage.plugin.version}</version>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <aggregate>true</aggregate>
+                    <highlighting>true</highlighting>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.2.0</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <classifier>spark-${spark.major.version}_scala-${scala.binary.version}</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.3.0</version>
                 <configuration>
-                    <descriptors>
-                        <descriptor>assembly.xml</descriptor>
-                    </descriptors>
-                    <finalName>${artifactId}-${project.version}</finalName>
+                    <tarLongFileMode>posix</tarLongFileMode>
+                    <finalName>
+                        ${project.artifactId}-${project.version}-spark-${spark.major.version}_scala-${scala.binary.version}
+                    </finalName>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.griffin.measure.Application</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
                 </configuration>
                 <executions>
                     <execution>
@@ -359,6 +436,172 @@
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>3.0.0-M2</version>
+                <executions>
+                    <execution>
+                        <id>enforce-versions</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <requireJavaVersion>
+                                    <version>${java.version}</version>
+                                </requireJavaVersion>
+                                <bannedDependencies>
+                                    <excludes>
+                                        <exclude>*:*_2.10</exclude>
+                                        <exclude>*:*_2.12</exclude>
+                                        <exclude>*:*_2.13</exclude>
+                                    </excludes>
+                                    <searchTransitive>true</searchTransitive>
+                                </bannedDependencies>
+                            </rules>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>enforce-no-duplicate-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <banDuplicatePomDependencyVersions/>
+                            </rules>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <!-- Scala 2.12 -->
+        <profile>
+            <id>scala-2.12</id>
+            <activation>
+                <property>
+                    <name>scala-2.12</name>
+                </property>
+            </activation>
+
+            <properties>
+                <scala.binary.version>2.12</scala.binary.version>
+                <scala.version>${scala.binary.version}.12</scala.version>
+            </properties>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-enforcer-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>enforce-versions</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <requireJavaVersion>
+                                            <version>${java.version}</version>
+                                        </requireJavaVersion>
+                                        <bannedDependencies>
+                                            <!-- TODO Temporary Fix: These will be removed in a future iteration. -->
+                                            <includes>
+                                                <include>org.elasticsearch:elasticsearch-spark-20_2.11:*</include>
+                                                <include>org.apache.spark:spark-streaming-kafka-0-8_2.11:*</include>
+                                                <include>org.apache.kafka:kafka_2.11:*</include>
+                                            </includes>
+                                            <excludes>
+                                                <exclude>*:*_2.10</exclude>
+                                                <exclude>*:*_2.11</exclude>
+                                                <exclude>*:*_2.13</exclude>
+                                            </excludes>
+                                            <searchTransitive>true</searchTransitive>
+                                        </bannedDependencies>
+                                    </rules>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>enforce-no-duplicate-dependencies</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <banDuplicatePomDependencyVersions/>
+                                    </rules>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
+        <!-- Spark 2.3.x -->
+        <profile>
+            <id>spark-2.3</id>
+            <activation>
+                <property>
+                    <name>spark-2.3</name>
+                </property>
+            </activation>
+
+            <properties>
+                <spark.major.version>2.3</spark.major.version>
+                <spark.version>${spark.major.version}.4</spark.version>
+            </properties>
+
+            <dependencies>
+                <!-- Old Spark Avro-->
+                <dependency>
+                    <groupId>com.databricks</groupId>
+                    <artifactId>spark-avro_${scala211.binary.version}</artifactId>
+                    <version>4.0.0</version>
+                </dependency>
+            </dependencies>
+        </profile>
+
+        <!-- Spark 3.0.x -->
+        <profile>
+            <id>spark-3.0</id>
+            <activation>
+                <property>
+                    <name>spark-3.0</name>
+                </property>
+            </activation>
+
+            <properties>
+                <spark.major.version>3.0</spark.major.version>
+                <spark.version>${spark.major.version}.2</spark.version>
+                <spark-streaming-kafka.version>2.4.7</spark-streaming-kafka.version>
+            </properties>
+        </profile>
+
+        <!-- Spark 2.4+ Avro -->
+        <profile>
+            <id>spark-avro-2.4</id>
+            <activation>
+                <property>
+                    <name>!spark-2.3</name>
+                </property>
+            </activation>
+
+            <dependencies>
+                <!-- New Spark Avro-->
+                <dependency>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-avro_${scala.binary.version}</artifactId>
+                    <version>${spark.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+
+    </profiles>
 </project>
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index bbec4e5..fd9261d 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -7,14 +7,14 @@
   },
   "sinks": [
     {
-      "name": "consoleSink",
+      "name": "console",
       "type": "CONSOLE",
       "config": {
         "max.log.lines": 10
       }
     },
     {
-      "name": "hdfsSink",
+      "name": "hdfs",
       "type": "HDFS",
       "config": {
         "path": "hdfs://localhost/griffin/batch/persist",
@@ -23,7 +23,7 @@
       }
     },
     {
-      "name": "elasticSink",
+      "name": "elasticsearch",
       "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
new file mode 100644
index 0000000..e7c19a8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamHttpReader.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.configuration.dqdefinition.reader
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+import org.apache.griffin.measure.configuration.dqdefinition.Param
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+
+/**
+ * read params by http url
+ *
+ * @param httpUrl
+ */
+case class ParamHttpReader(httpUrl: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m: ClassTag[T]): Try[T] = {
+    Try {
+      val params = Map[String, Object]()
+      val headers = Map[String, Object](("Content-Type", "application/json"))
+      val jsonString = HttpUtil.doHttpRequest(httpUrl, "get", params, headers, null)._2
+      val param = JsonUtil.fromJson[T](jsonString)
+      validate(param)
+    }
+  }
+}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
index 5067a9d..f4bfd28 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReaderFactory.scala
@@ -23,6 +23,7 @@
 
   val json = "json"
   val file = "file"
+  val httpRegex = "^http[s]?://.*"
 
   /**
    * parse string content to get param reader
@@ -30,9 +31,13 @@
    * @return
    */
   def getParamReader(pathOrJson: String): ParamReader = {
-    val strType = paramStrType(pathOrJson)
-    if (json.equals(strType)) ParamJsonReader(pathOrJson)
-    else ParamFileReader(pathOrJson)
+    if (pathOrJson.matches(httpRegex)) {
+      ParamHttpReader(pathOrJson)
+    } else {
+      val strType = paramStrType(pathOrJson)
+      if (json.equals(strType)) ParamJsonReader(pathOrJson)
+      else ParamFileReader(pathOrJson)
+    }
   }
 
   private def paramStrType(str: String): String = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index 04775c7..643ff5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -184,14 +184,15 @@
     }
 
     // new cache data
-    val newDfOpt = try {
-      val dfr = sparkSession.read
-      readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
-    } catch {
-      case e: Throwable =>
-        warn(s"read data source cache warn: ${e.getMessage}")
-        None
-    }
+    val newDfOpt =
+      try {
+        val dfr = sparkSession.read
+        readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
+      } catch {
+        case e: Throwable =>
+          warn(s"read data source cache warn: ${e.getMessage}")
+          None
+      }
 
     // old cache data
     val oldCacheIndexOpt = if (updatable) readOldCacheIndex() else None
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 39827cf..0b08dd4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -32,8 +32,15 @@
 
 object DataConnectorFactory extends Loggable {
 
-  @deprecated val AvroRegex: Regex = """^(?i)avro$""".r
-  @deprecated val TextDirRegex: Regex = """^(?i)text-dir$""".r
+  @deprecated(
+    s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+    "0.6.0")
+  val AvroRegex: Regex = """^(?i)avro$""".r
+
+  @deprecated(
+    s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+    "0.6.0")
+  val TextDirRegex: Regex = """^(?i)text-dir$""".r
 
   val HiveRegex: Regex = """^(?i)hive$""".r
   val FileRegex: Regex = """^(?i)file$""".r
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
index d135f3b..2fd2912 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
@@ -52,33 +52,34 @@
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
 
-    val dfOpt = try {
-      sparkSession.conf.set("spark.cassandra.connection.host", host)
-      sparkSession.conf.set("spark.cassandra.connection.port", port)
-      sparkSession.conf.set("spark.cassandra.auth.username", user)
-      sparkSession.conf.set("spark.cassandra.auth.password", password)
+    val dfOpt =
+      try {
+        sparkSession.conf.set("spark.cassandra.connection.host", host)
+        sparkSession.conf.set("spark.cassandra.connection.port", port)
+        sparkSession.conf.set("spark.cassandra.auth.username", user)
+        sparkSession.conf.set("spark.cassandra.auth.password", password)
 
-      val tableDef: DataFrameReader = sparkSession.read
-        .format("org.apache.spark.sql.cassandra")
-        .options(Map("table" -> tableName, "keyspace" -> database))
+        val tableDef: DataFrameReader = sparkSession.read
+          .format("org.apache.spark.sql.cassandra")
+          .options(Map("table" -> tableName, "keyspace" -> database))
 
-      val dataWh: String = dataWhere()
+        val dataWh: String = dataWhere()
 
-      var data: DataFrame = null
-      if (wheres.length > 0) {
-        data = tableDef.load().where(dataWh)
-      } else {
-        data = tableDef.load()
+        var data: DataFrame = null
+        if (wheres.length > 0) {
+          data = tableDef.load().where(dataWh)
+        } else {
+          data = tableDef.load()
+        }
+
+        val dfOpt = Some(data)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
+          None
       }
-
-      val dfOpt = Some(data)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
-        None
-    }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index dfdacc8..597695e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -77,23 +77,24 @@
   def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
     val path: String = s"/_sql?format=csv"
     info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
-    val dfOpt = try {
-      val answer = httpPost(path, sql)
-      if (answer._1) {
-        import sparkSession.implicits._
-        val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
-        val reader: DataFrameReader = sparkSession.read
-        reader.option("header", true).option("inferSchema", true)
-        val df: DataFrame = reader.csv(rdd.toDS())
-        val dfOpt = Some(df)
-        val preDfOpt = preProcess(dfOpt, ms)
-        preDfOpt
-      } else None
-    } catch {
-      case e: Throwable =>
-        error(s"load ES by sql $host:$port $sql  fails: ${e.getMessage}", e)
-        None
-    }
+    val dfOpt =
+      try {
+        val answer = httpPost(path, sql)
+        if (answer._1) {
+          import sparkSession.implicits._
+          val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+          val reader: DataFrameReader = sparkSession.read
+          reader.option("header", true).option("inferSchema", true)
+          val df: DataFrame = reader.csv(rdd.toDS())
+          val dfOpt = Some(df)
+          val preDfOpt = preProcess(dfOpt, ms)
+          preDfOpt
+        } else None
+      } catch {
+        case e: Throwable =>
+          error(s"load ES by sql $host:$port $sql  fails: ${e.getMessage}", e)
+          None
+      }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
@@ -102,44 +103,44 @@
     val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
     info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
 
-    val dfOpt = try {
-      val answer = httpGet(path)
-      val data = ArrayBuffer[Map[String, Number]]()
+    val dfOpt =
+      try {
+        val answer = httpGet(path)
+        val data = ArrayBuffer[Map[String, Number]]()
 
-      if (answer._1) {
-        val arrayAnswers: util.Iterator[JsonNode] =
-          parseString(answer._2).get("hits").get("hits").elements()
+        if (answer._1) {
+          val arrayAnswers: util.Iterator[JsonNode] =
+            parseString(answer._2).get("hits").get("hits").elements()
 
-        while (arrayAnswers.hasNext) {
-          val answer = arrayAnswers.next()
-          val values = answer.get("_source").get("value")
-          val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
-          val fieldsMap = mutable.Map[String, Number]()
-          while (fields.hasNext) {
-            val fld: util.Map.Entry[String, JsonNode] = fields.next()
-            fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+          while (arrayAnswers.hasNext) {
+            val answer = arrayAnswers.next()
+            val values = answer.get("_source").get("value")
+            val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
+            val fieldsMap = mutable.Map[String, Number]()
+            while (fields.hasNext) {
+              val fld: util.Map.Entry[String, JsonNode] = fields.next()
+              fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+            }
+            data += fieldsMap.toMap
           }
-          data += fieldsMap.toMap
         }
+        val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
+        val columns: Array[String] = fields.toArray
+        val defaultNumber: Number = 0.0
+        val rdd: RDD[Row] = rdd1
+          .map { x: Map[String, Number] =>
+            Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
+          }
+        val schema = dfSchema(columns.toList)
+        val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
+          None
       }
-      val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
-      val columns: Array[String] = fields.toArray
-      val defaultNumber: Number = 0.0
-      val rdd: RDD[Row] = rdd1
-        .map { x: Map[String, Number] =>
-          Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
-        }
-      val schema = dfSchema(columns.toList)
-      val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
-      df.show(20)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
-        None
-    }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
index 86049bd..2e4f482 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -79,7 +79,14 @@
     SupportedFormats.contains(format),
     s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
 
-  if (format == "csv") validateCSVOptions()
+  if (format.equalsIgnoreCase("avro") && sparkSession.version < "2.3.0") {
+    format = "com.databricks.spark.avro"
+  }
+
+  if (format == "csv") {
+    validateCSVOptions()
+  }
+
   if (format == "tsv") {
     format = "csv"
     options.getOrElseUpdate(Delimiter, TabDelimiter)
@@ -169,7 +176,8 @@
   private val TabDelimiter: String = "\t"
 
   private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
-  private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
+  private val SupportedFormats: Seq[String] =
+    Seq("parquet", "orc", "avro", "text", "csv", "tsv", "com.databricks.spark.avro")
 
   /**
    * Validates the existence of paths in a given sequence.
@@ -189,7 +197,7 @@
           else throw new IllegalArgumentException(msg)
 
           false
-      })
+        })
 
     assert(validPaths.nonEmpty, "No paths were given for the data source.")
     validPaths
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
index 9fc21d7..2f77db3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -69,27 +69,28 @@
   assert(isJDBCDriverLoaded(driver), s"JDBC driver $driver not present in classpath")
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val dtSql = createSqlStmt()
-      val prop = new java.util.Properties
-      prop.setProperty("user", user)
-      prop.setProperty("password", password)
-      prop.setProperty("driver", driver)
-      val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
+    val dfOpt =
+      try {
+        val dtSql = createSqlStmt()
+        val prop = new java.util.Properties
+        prop.setProperty("user", user)
+        prop.setProperty("password", password)
+        prop.setProperty("driver", driver)
+        val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
 
-      dfOpt match {
-        case Success(_) =>
-        case Failure(exception) =>
-          griffinLogger.error("Error occurred while reading data set.", exception)
+        dfOpt match {
+          case Success(_) =>
+          case Failure(exception) =>
+            griffinLogger.error("Error occurred while reading data set.", exception)
+        }
+
+        val preDfOpt = preProcess(dfOpt.toOption, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"loading table $fullTableName fails: ${e.getMessage}", e)
+          None
       }
-
-      val preDfOpt = preProcess(dfOpt.toOption, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"loading table $fullTableName fails: ${e.getMessage}", e)
-        None
-    }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
@@ -99,7 +100,7 @@
    */
   private def createSqlStmt(): String = {
     val tableClause = s"SELECT * FROM $fullTableName"
-    if (whereString.length > 0) {
+    if (whereString.nonEmpty) {
       s"$tableClause WHERE $whereString"
     } else tableClause
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index feacfc9..26321fc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -52,21 +52,22 @@
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
 
-    val dfOpt = try {
-      val dtSql = dataSql()
-      val prop = new java.util.Properties
-      prop.setProperty("user", user)
-      prop.setProperty("password", password)
-      prop.setProperty("driver", driver)
-      val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
-        None
-    }
+    val dfOpt =
+      try {
+        val dtSql = dataSql()
+        val prop = new java.util.Properties
+        prop.setProperty("user", user)
+        prop.setProperty("password", password)
+        prop.setProperty("driver", driver)
+        val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
+          None
+      }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 0c501dc..b0be557 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -51,23 +51,24 @@
     }
     ds.foreachRDD((rdd, time) => {
       val ms = time.milliseconds
-      val saveDfOpt = try {
-        // coalesce partition number
-        val prlCount = rdd.sparkContext.defaultParallelism
-        val ptnCount = rdd.getNumPartitions
-        val repartitionedRdd = if (prlCount < ptnCount) {
-          rdd.coalesce(prlCount)
-        } else rdd
+      val saveDfOpt =
+        try {
+          // coalesce partition number
+          val prlCount = rdd.sparkContext.defaultParallelism
+          val ptnCount = rdd.getNumPartitions
+          val repartitionedRdd = if (prlCount < ptnCount) {
+            rdd.coalesce(prlCount)
+          } else rdd
 
-        val dfOpt = transform(repartitionedRdd)
+          val dfOpt = transform(repartitionedRdd)
 
-        // pre-process
-        preProcess(dfOpt, ms)
-      } catch {
-        case e: Throwable =>
-          error(s"streaming data connector error: ${e.getMessage}")
-          None
-      }
+          // pre-process
+          preProcess(dfOpt, ms)
+        } catch {
+          case e: Throwable =>
+            error(s"streaming data connector error: ${e.getMessage}")
+            None
+        }
 
       // save data frame
       streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index d8cce41..3134b44 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -63,7 +63,8 @@
 
       def func(): (Long, Future[Boolean]) = {
         import scala.concurrent.ExecutionContext.Implicits.global
-        (timeStamp, Future(HttpUtil.doHttpRequest(api, method, params, header, data)))
+        val code = HttpUtil.doHttpRequest(api, method, params, header, data)._1
+        (timeStamp, Future(code >= 200 && code < 300))
       }
       if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
       else SinkTaskRunner.addNonBlockTask(func _, retry)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
index 743bba9..9aa27f7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
@@ -43,7 +43,7 @@
             v :+ se
           case _ => v
         }
-    }
+      }
   val combSelectionExprs: (Seq[SelectionExpr], Seq[SelectionExpr]) => Seq[SelectionExpr] =
     (a: Seq[SelectionExpr], b: Seq[SelectionExpr]) => a ++ b
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 4f4bf99..980bded 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -33,20 +33,21 @@
   def execute(context: DQContext): Try[Boolean] = Try {
     context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
-      val pr = try {
-        context.getSinks(t).foreach { sink =>
-          try {
-            sink.sinkMetrics(metric)
-          } catch {
-            case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+      val pr =
+        try {
+          context.getSinks(t).foreach { sink =>
+            try {
+              sink.sinkMetrics(metric)
+            } catch {
+              case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+            }
           }
+          true
+        } catch {
+          case e: Throwable =>
+            error(s"flush metrics error: ${e.getMessage}", e)
+            false
         }
-        true
-      } catch {
-        case e: Throwable =>
-          error(s"flush metrics error: ${e.getMessage}", e)
-          false
-      }
       ret && pr
     }
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
index b7f50e8..680f7a8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -38,13 +38,14 @@
         fsMap.get(uri.getScheme) match {
           case Some(fs) => fs
           case _ =>
-            val fs = try {
-              FileSystem.get(uri, getConfiguration)
-            } catch {
-              case e: Throwable =>
-                error(s"get file system error: ${e.getMessage}", e)
-                throw e
-            }
+            val fs =
+              try {
+                FileSystem.get(uri, getConfiguration)
+              } catch {
+                case e: Throwable =>
+                  error(s"get file system error: ${e.getMessage}", e)
+                  throw e
+              }
             fsMap += (uri.getScheme -> fs)
             fs
         }
@@ -60,11 +61,12 @@
   }
 
   private def getUriOpt(path: String): Option[URI] = {
-    val uriOpt = try {
-      Some(new URI(path))
-    } catch {
-      case _: Throwable => None
-    }
+    val uriOpt =
+      try {
+        Some(new URI(path))
+      } catch {
+        case _: Throwable => None
+      }
     uriOpt.flatMap { uri =>
       if (uri.getScheme == null) {
         try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index 66648d0..652bbf7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -19,10 +19,10 @@
 
 import scala.util.matching.Regex
 
-import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.client.methods.{HttpDelete, HttpGet, HttpPost, HttpPut}
+import org.apache.http.client.utils.URIBuilder
 import org.apache.http.entity.{ContentType, StringEntity}
-import org.apache.http.impl.client.HttpClientBuilder
-import scalaj.http._
+import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
 
 object HttpUtil {
 
@@ -31,65 +31,34 @@
   val PUT_REGEX: Regex = """^(?i)put$""".r
   val DELETE_REGEX: Regex = """^(?i)delete$""".r
 
-  def postData(
-      url: String,
-      params: Map[String, Object],
-      headers: Map[String, Object],
-      data: String): Boolean = {
-    val response = Http(url)
-      .params(convertObjMap2StrMap(params))
-      .headers(convertObjMap2StrMap(headers))
-      .postData(data)
-      .asString
-
-    response.isSuccess
-  }
-
   def doHttpRequest(
       url: String,
       method: String,
       params: Map[String, Object],
       headers: Map[String, Object],
-      data: String): Boolean = {
+      data: String): (Integer, String) = {
     val client = HttpClientBuilder.create.build
-    method match {
+    val uriBuilder = new URIBuilder(url)
+    convertObjMap2StrMap(params) foreach (param => uriBuilder.setParameter(param._1, param._2))
+    val handler = new BasicResponseHandler()
+    val request = method match {
       case POST_REGEX() =>
-        val post = new HttpPost(url)
-        convertObjMap2StrMap(headers) foreach (header => post.addHeader(header._1, header._2))
+        val post = new HttpPost(uriBuilder.build())
         post.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
-
-        // send the post request
-        val response = client.execute(post)
-        val code = response.getStatusLine.getStatusCode
-        code >= 200 && code < 300
+        post
       case PUT_REGEX() =>
-        val get = new HttpGet(url)
-        convertObjMap2StrMap(headers) foreach (header => get.addHeader(header._1, header._2))
-        val response = client.execute(get)
-        val code = response.getStatusLine.getStatusCode
-        code >= 200 && code < 300
-      case _ => false
+        val put = new HttpPut(uriBuilder.build())
+        put.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON))
+        put
+      case GET_REGEX() =>
+        new HttpGet(uriBuilder.build())
+      case DELETE_REGEX() =>
+        new HttpDelete(uriBuilder.build())
+      case _ => throw new UnsupportedOperationException("Unsupported http method error!")
     }
-  }
-
-  def httpRequest(
-      url: String,
-      method: String,
-      params: Map[String, Object],
-      headers: Map[String, Object],
-      data: String): Boolean = {
-    val httpReq = Http(url)
-      .params(convertObjMap2StrMap(params))
-      .headers(convertObjMap2StrMap(headers))
-    method match {
-      case POST_REGEX() =>
-        val res = httpReq.postData(data).asString
-        res.isSuccess
-      case PUT_REGEX() =>
-        val res = httpReq.put(data).asString
-        res.isSuccess
-      case _ => false
-    }
+    convertObjMap2StrMap(headers) foreach (header => request.addHeader(header._1, header._2))
+    val response = client.execute(request)
+    (response.getStatusLine.getStatusCode, handler.handleResponse(response).trim)
   }
 
   private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
index 52c5f38..bc1e292 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -21,10 +21,6 @@
 
 import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
 import scala.concurrent.duration.Duration
-import scala.concurrent.forkjoin.{
-  ForkJoinPool => SForkJoinPool,
-  ForkJoinWorkerThread => SForkJoinWorkerThread
-}
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
@@ -172,15 +168,15 @@
   /**
    * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
    */
-  def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+  def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
     // Custom factory to set thread names
-    val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
-      override def newThread(pool: SForkJoinPool): SForkJoinWorkerThread =
-        new SForkJoinWorkerThread(pool) {
+    val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
+      override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
+        new ForkJoinWorkerThread(pool) {
           setName(prefix + "-" + super.getName)
         }
     }
-    new SForkJoinPool(
+    new ForkJoinPool(
       maxThreadNumber,
       factory,
       null, // handler
diff --git a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
index dbed89c..011122f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -22,9 +22,10 @@
 import org.apache.commons.io.FileUtils
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SparkSession
-import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
 
-trait SparkSuiteBase extends FlatSpec with BeforeAndAfterAll {
+trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll {
 
   @transient var spark: SparkSession = _
   @transient var sc: SparkContext = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
index efaa91f..8f191bc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -17,7 +17,9 @@
 
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
 
 import org.apache.griffin.measure.configuration.dqdefinition.{
   DQConfig,
@@ -26,7 +28,7 @@
   RuleParam
 }
 
-class ParamEnumReaderSpec extends FlatSpec with Matchers {
+class ParamEnumReaderSpec extends AnyFlatSpec with Matchers {
   import org.apache.griffin.measure.configuration.enums.DslType._
   "dsltype" should "be parsed to predefined set of values" in {
     val validDslSparkSqlValues =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index a05df90..94c7e07 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -22,8 +22,10 @@
 
 import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
 import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-
-class ParamFileReaderSpec extends FlatSpec with Matchers {
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
+class ParamFileReaderSpec extends AnyFlatSpec with Matchers {
 
   "params " should "be parsed from a valid file" in {
     val reader: ParamReader =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 86d68b5..d8bf125 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -18,14 +18,15 @@
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
 import scala.io.Source
-
-import org.scalatest.{FlatSpec, Matchers}
 import scala.util.{Failure, Success}
 
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
 import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
 import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
 
-class ParamJsonReaderSpec extends FlatSpec with Matchers {
+class ParamJsonReaderSpec extends AnyFlatSpec with Matchers {
 
   "params " should "be parsed from a valid file" in {
     val bufferedSource =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
index 898c8fd..6f5717a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
@@ -19,11 +19,12 @@
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.types._
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 
-class DataFrameCacheTest extends FlatSpec with Matchers with SparkSuiteBase {
+class DataFrameCacheTest extends AnyFlatSpec with Matchers with SparkSuiteBase {
 
   def createDataFrame(arr: Seq[Int]): DataFrame = {
     val schema = StructType(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
index 7d25cb1..9a63abd 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.context
 
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
-class MetricWrapperTest extends FlatSpec with Matchers {
+class MetricWrapperTest extends AnyFlatSpec with Matchers {
 
   "metric wrapper" should "flush empty if no metric inserted" in {
     val metricWrapper = MetricWrapper("name", "appId")
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
index 28c195b..9648d96 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.context
 
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
-class TimeRangeTest extends FlatSpec with Matchers {
+class TimeRangeTest extends AnyFlatSpec with Matchers {
 
   "time range" should "be able to merge another time range" in {
     val tr1 = TimeRange(1, 10, Set(2, 5, 8))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
index a90768e..c60b439 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.datasource
 
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
-class TimestampStorageTest extends FlatSpec with Matchers {
+class TimestampStorageTest extends AnyFlatSpec with Matchers {
 
   "timestamp storage" should "be able to insert a timestamp" in {
     val timestampStorage = TimestampStorage()
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 466196b..fd4e6a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -23,7 +23,7 @@
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream.InputDStream
-import org.scalatest.FlatSpec
+import org.scalatest.flatspec.AnyFlatSpec
 
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
@@ -75,7 +75,7 @@
   override def data(ms: Long): (Option[DataFrame], TimeRange) = null
 }
 
-class DataConnectorFactorySpec extends FlatSpec {
+class DataConnectorFactorySpec extends AnyFlatSpec {
 
   "DataConnectorFactory" should "be able to create custom batch connector" in {
     val param = DataConnectorParam(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
index 60c50ca..37c63fc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
@@ -18,13 +18,15 @@
 package org.apache.griffin.measure.datasource.connector.batch
 
 import org.apache.spark.sql.types.StructType
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
+import org.scalatest.Ignore
 import org.testcontainers.elasticsearch.ElasticsearchContainer
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.datasource.TimestampStorage
 
+@Ignore
 class ElasticSearchDataConnectorTest extends SparkSuiteBase with Matchers {
 
   // ignorance flag that could skip cases
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
index 2493df3..2ae65be 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -18,7 +18,7 @@
 package org.apache.griffin.measure.datasource.connector.batch
 
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
-import org.scalatest._
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -65,13 +65,12 @@
     // valid schema
     val result1 = FileBasedDataConnector(
       spark,
-      dcParam.copy(
-        config = configs + (
-          (
-            "schema",
-            Seq(
-              Map("name" -> "name", "type" -> "string"),
-              Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
+      dcParam.copy(config = configs + (
+        (
+          "schema",
+          Seq(
+            Map("name" -> "name", "type" -> "string"),
+            Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
       timestampStorage)
       .data(1L)
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index 77040e3..8a755b6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -19,7 +19,7 @@
 import java.sql.DriverManager
 import java.util.Properties
 
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a557dda..9fc9883 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -19,7 +19,7 @@
 
 import scala.util.{Failure, Success}
 
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.Application._
@@ -30,12 +30,7 @@
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
 
-class DQAppTest
-    extends FlatSpec
-    with SparkSuiteBase
-    with BeforeAndAfterAll
-    with Matchers
-    with Loggable {
+class DQAppTest extends SparkSuiteBase with Matchers with Loggable {
 
   var envParam: EnvConfig = _
   var sparkParam: SparkParam = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index e4754e0..8bf81b1 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -57,7 +57,7 @@
       }
       sinks.headOption match {
         case Some(sink: CustomSink) => sink.allMetrics
-        case _ => mutable.ListBuffer[String]()
+        case _ => Map.empty
       }
     })
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index 919183b..b13858f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -19,14 +19,15 @@
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.types._
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 
-trait SinkTestBase extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+trait SinkTestBase extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
 
   var sinkParams: Seq[SinkParam]
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 834d8e0..5892c11 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -17,15 +17,17 @@
 
 package org.apache.griffin.measure.step
 
-import org.scalatest._
 import scala.util.Try
 
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 import org.apache.griffin.measure.step.transform.TransformStep
 
-class TransformStepTest extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+class TransformStepTest extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
 
   case class DualTransformStep(
       name: String,
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
index 67e5236..f80fb13 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
@@ -17,15 +17,15 @@
 
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.scalatest._
-import org.scalatest.mockito.MockitoSugar
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+import org.scalatestplus.mockito.MockitoSugar
 
-import org.apache.griffin.measure.configuration.dqdefinition.RuleErrorConfParam
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam, RuleParam}
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.builder.dsl.expr.Expr
 
-class CompletenessExpr2DQStepsTest extends FlatSpec with Matchers with MockitoSugar {
+class CompletenessExpr2DQStepsTest extends AnyFlatSpec with Matchers with MockitoSugar {
 
   "CompletenessExpr2DQSteps" should "get correct where clause" in {
     val completeness = CompletenessExpr2DQSteps(mock[DQContext], mock[Expr], mock[RuleParam])
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 2f06ad3..5b019a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -18,7 +18,8 @@
 package org.apache.griffin.measure.transformations
 
 import org.apache.spark.sql.DataFrame
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition._
@@ -29,7 +30,10 @@
 
 case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double)
 
-class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with SparkSuiteBase {
+class AccuracyTransformationsIntegrationTest
+    extends AnyFlatSpec
+    with Matchers
+    with SparkSuiteBase {
   private val EMPTY_PERSON_TABLE = "empty_person"
   private val PERSON_TABLE = "person"
 
@@ -80,10 +84,9 @@
       sourceName: String,
       targetName: String,
       expectedResult: AccuracyResult) = {
-    val dqContext: DQContext = getDqContext(
-      dataSourcesParam = List(
-        DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
-        DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
+    val dqContext: DQContext = getDqContext(dataSourcesParam = List(
+      DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
+      DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
 
     val accuracyRule = RuleParam(
       dslType = "griffin-dsl",
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
index 720f9b2..9f204a8 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
@@ -17,11 +17,13 @@
 
 package org.apache.griffin.measure.utils
 
-import org.scalatest._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.utils.ParamUtil._
 
-class ParamUtilTest extends FlatSpec with Matchers with BeforeAndAfter {
+class ParamUtilTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
 
   val fruits: Map[String, Any] =
     Map[String, Any]("A" -> "apple", "B" -> "banana", "O" -> "orange")
diff --git a/pom.xml b/pom.xml
index 7a68625..e51c94e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
     <groupId>org.apache.griffin</groupId>
     <artifactId>griffin</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>0.7.0-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>Apache Griffin ${project.version}</name>
     <url>http://griffin.apache.org</url>
@@ -40,13 +40,23 @@
     </prerequisites>
 
     <properties>
+        <encoding>UTF-8</encoding>
+        <project.build.sourceEncoding>${encoding}</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>${encoding}</project.reporting.outputEncoding>
+
         <java.version>1.8</java.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala211.binary.version>2.11</scala211.binary.version>
+        <scala.version>${scala.binary.version}.0</scala.version>
+
+        <maven.compiler.source>${java.version}</maven.compiler.source>
+        <maven.compiler.target>${java.version}</maven.compiler.target>
+
         <maven-apache-rat.version>0.11</maven-apache-rat.version>
         <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <maven.compiler.source>1.8</maven.compiler.source>
-        <maven.compiler.target>1.8</maven.compiler.target>
+
+        <compile.scope>compile</compile.scope>
+        <provided.scope>provided</provided.scope>
     </properties>
 
     <licenses>
@@ -109,7 +119,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-compiler-plugin</artifactId>
-                    <version>3.6.1</version>
+                    <version>3.8.1</version>
                     <configuration>
                         <source>${maven.compiler.source}</source>
                         <target>${maven.compiler.target}</target>
@@ -119,10 +129,16 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.22.0</version>
+                    <version>3.0.0-M3</version>
                     <configuration>
+                        <testFailureIgnore>false</testFailureIgnore>
+                        <useSystemClassLoader>true</useSystemClassLoader>
+                        <forkMode>once</forkMode>
+                        <failIfNoTests>false</failIfNoTests>
                         <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                        <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
+                        <systemProperties>
+                            <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+                        </systemProperties>
                     </configuration>
                     <executions>
                         <execution>
@@ -133,24 +149,7 @@
                         </execution>
                     </executions>
                 </plugin>
-                <!-- Scalatest runs all Scala tests -->
-                <!-- enable scalatest -->
-                <plugin>
-                    <groupId>org.scalatest</groupId>
-                    <artifactId>scalatest-maven-plugin</artifactId>
-                    <version>1.0</version>
-                    <configuration>
-                        <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
-                    </configuration>
-                    <executions>
-                        <execution>
-                            <id>test</id>
-                            <goals>
-                                <goal>test</goal>
-                            </goals>
-                        </execution>
-                    </executions>
-                </plugin>
+
                 <plugin>
                     <groupId>org.apache.rat</groupId>
                     <artifactId>apache-rat-plugin</artifactId>
@@ -196,6 +195,7 @@
                             <exclude>**/src/main/resources/public/**</exclude>
                             <exclude>**/pom.xml.releaseBackup</exclude>
                             <exclude>**/pom.xml.tag</exclude>
+                            <exclude>**/_SUCCESS</exclude>
                         </excludes>
                     </configuration>
                     <executions>
diff --git a/service/pom.xml b/service/pom.xml
index dd2466f..8121998 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <groupId>org.apache.griffin</groupId>
         <artifactId>griffin</artifactId>
-        <version>0.6.0-SNAPSHOT</version>
+        <version>0.7.0-SNAPSHOT</version>
     </parent>
 
     <artifactId>service</artifactId>
@@ -34,7 +34,6 @@
     <properties>
         <hadoop.version>2.7.1</hadoop.version>
         <hive.version>2.2.0</hive.version>
-        <scala.version>2.10</scala.version>
         <spring.boot.version>2.1.7.RELEASE</spring.boot.version>
         <spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>
         <confluent.version>3.2.0</confluent.version>
@@ -258,7 +257,7 @@
         <!--livy-core-->
         <dependency>
             <groupId>com.cloudera.livy</groupId>
-            <artifactId>livy-core_${scala.version}</artifactId>
+            <artifactId>livy-core_2.10</artifactId>
             <version>${livy.core.version}</version>
         </dependency>
 
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index b46ea8b..9eb60ad 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -403,7 +403,7 @@
         Set<String> sets = new HashSet<>();
         List<DataSource> sources = measure.getDataSources();
         for (DataSource source : sources) {
-            source.getConnectors().forEach(dc -> sets.add(dc.getName()));
+            sets.add(source.getConnector().getName());
         }
         if (sets.size() < sources.size()) {
             LOGGER.warn("Connector names cannot be repeated.");
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 3899147..344cf75 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -146,8 +146,7 @@
         jobStartTime = triggerTime.getTime();
     }
 
-    private void setSourcesPartitionsAndPredicates(List<DataSource> sources)
-        throws Exception {
+    private void setSourcesPartitionsAndPredicates(List<DataSource> sources) {
         boolean isFirstBaseline = true;
         for (JobDataSegment jds : job.getSegments()) {
             if (jds.isAsTsBaseline() && isFirstBaseline) {
@@ -157,22 +156,14 @@
                 isFirstBaseline = false;
             }
             for (DataSource ds : sources) {
-                setDataSourcePartitions(jds, ds);
+                setDataConnectorPartitions(jds, ds.getConnector());
             }
         }
     }
 
-    private void setDataSourcePartitions(JobDataSegment jds, DataSource ds)
-        throws Exception {
-        List<DataConnector> connectors = ds.getConnectors();
-        for (DataConnector dc : connectors) {
-            setDataConnectorPartitions(jds, dc);
-        }
-    }
-
     private void setDataConnectorPartitions(
         JobDataSegment jds,
-        DataConnector dc) throws Exception {
+        DataConnector dc) {
         String dcName = jds.getDataConnectorName();
         if (dcName.equals(dc.getName())) {
             Long[] sampleTs = genSampleTs(jds.getSegmentRange(), dc);
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 970977b..378b397 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -28,21 +28,10 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.FetchType;
-import javax.persistence.JoinColumn;
-import javax.persistence.OneToMany;
-import javax.persistence.PostLoad;
-import javax.persistence.PrePersist;
-import javax.persistence.PreUpdate;
-import javax.persistence.Transient;
+import javax.persistence.*;
 
 import org.apache.griffin.core.util.JsonUtil;
-import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
 @Entity
@@ -51,10 +40,10 @@
 
     private String name;
 
-    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
+    @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST,
         CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "data_source_id")
-    private List<DataConnector> connectors = new ArrayList<>();
+    private DataConnector connector = new DataConnector();
 
     private boolean baseline = false;
 
@@ -75,15 +64,12 @@
         this.name = name;
     }
 
-    public List<DataConnector> getConnectors() {
-        return connectors;
+    public DataConnector getConnector() {
+        return connector;
     }
 
-    public void setConnectors(List<DataConnector> connectors) {
-        if (CollectionUtils.isEmpty(connectors)) {
-            throw new NullPointerException("Data connector can not be empty.");
-        }
-        this.connectors = connectors;
+    public void setConnector(DataConnector connector) {
+        this.connector = connector;
     }
 
     public boolean isBaseline() {
@@ -132,18 +118,18 @@
     public DataSource() {
     }
 
-    public DataSource(String name, List<DataConnector> connectors) {
+    public DataSource(String name, DataConnector connector) {
         this.name = name;
-        this.connectors = connectors;
+        this.connector = connector;
     }
 
     public DataSource(String name, boolean baseline,
                       Map<String, Object> checkpointMap,
-                      List<DataConnector> connectors) {
+                      DataConnector connector) {
         this.name = name;
         this.baseline = baseline;
         this.checkpointMap = checkpointMap;
-        this.connectors = connectors;
+        this.connector = connector;
 
     }
 }
diff --git a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
index d855183..4b83907 100644
--- a/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/metastore/hive/HiveMetaStoreServiceImpl.java
@@ -49,7 +49,7 @@
     private static final Logger LOGGER = LoggerFactory
         .getLogger(HiveMetaStoreService.class);
 
-    @Autowired
+    @Autowired(required = false)
     private IMetaStoreClient client = null;
 
     @Value("${hive.metastore.dbname}")
diff --git a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
index 7e57d87..59526bd 100644
--- a/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/MeasureUtil.java
@@ -42,7 +42,7 @@
 
 public class MeasureUtil {
     private static final Logger LOGGER = LoggerFactory
-        .getLogger(MeasureUtil.class);
+            .getLogger(MeasureUtil.class);
 
     public static void validateMeasure(Measure measure) {
         if (measure instanceof GriffinMeasure) {
@@ -56,7 +56,7 @@
     private static void validateGriffinMeasure(GriffinMeasure measure) {
         if (getConnectorNamesIfValid(measure) == null) {
             throw new GriffinException.BadRequestException
-                (INVALID_CONNECTOR_NAME);
+                    (INVALID_CONNECTOR_NAME);
         }
         if (!validatePredicates(measure)) {
             throw new GriffinException.BadRequestException(INVALID_MEASURE_PREDICATE);
@@ -65,13 +65,11 @@
 
     private static boolean validatePredicates(GriffinMeasure measure) {
         for (DataSource dataSource : measure.getDataSources()) {
-            for (DataConnector dataConnector : dataSource.getConnectors()) {
-                for (SegmentPredicate segmentPredicate : dataConnector.getPredicates()) {
-                    try {
-                        PredicatorFactory.newPredicateInstance(segmentPredicate);
-                    } catch (Exception e) {
-                        return false;
-                    }
+            for (SegmentPredicate segmentPredicate : dataSource.getConnector().getPredicates()) {
+                try {
+                    PredicatorFactory.newPredicateInstance(segmentPredicate);
+                } catch (Exception e) {
+                    return false;
                 }
             }
         }
@@ -81,7 +79,7 @@
     private static void validateExternalMeasure(ExternalMeasure measure) {
         if (StringUtils.isBlank(measure.getMetricName())) {
             LOGGER.warn("Failed to create external measure {}. " +
-                "Its metric name is blank.", measure.getName());
+                    "Its metric name is blank.", measure.getName());
             throw new GriffinException.BadRequestException(MISSING_METRIC_NAME);
         }
     }
@@ -90,8 +88,9 @@
         Set<String> sets = new HashSet<>();
         List<DataSource> sources = measure.getDataSources();
         for (DataSource source : sources) {
-            source.getConnectors().stream().filter(dc -> dc.getName() != null)
-                .forEach(dc -> sets.add(dc.getName()));
+            if(source.getConnector() != null && source.getConnector().getName() != null){
+                sets.add(source.getConnector().getName());
+            }
         }
         if (sets.size() == 0 || sets.size() < sources.size()) {
             LOGGER.warn("Connector names cannot be repeated or empty.");
diff --git a/service/src/main/resources/env/env_batch.json b/service/src/main/resources/env/env_batch.json
index 72a3839..9ed9ef7 100644
--- a/service/src/main/resources/env/env_batch.json
+++ b/service/src/main/resources/env/env_batch.json
@@ -4,20 +4,23 @@
   },
   "sinks": [
     {
+      "name": "console",
       "type": "CONSOLE",
       "config": {
         "max.log.lines": 10
       }
     },
     {
+      "name": "hdfs",
       "type": "HDFS",
       "config": {
-        "path": "hdfs:///griffin/persist",
+        "path": "hdfs://localhost/griffin/persist",
         "max.persist.lines": 10000,
         "max.lines.per.file": 10000
       }
     },
     {
+      "name": "elasticsearch",
       "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
diff --git a/service/src/main/resources/log4j2-spring.xml b/service/src/main/resources/log4j2-spring.xml
index c021b4a..9cee7a5 100644
--- a/service/src/main/resources/log4j2-spring.xml
+++ b/service/src/main/resources/log4j2-spring.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
 <Configuration status="WARN">
     <Properties>
         <Property name="PID">????</Property>
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
index 00ce1fa..534527f 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
@@ -66,7 +66,7 @@
         GriffinMeasure m = (GriffinMeasure) measures.get(0);
 
         List<DataSource> sources = m.getDataSources();
-        DataConnector connector = sources.get(0).getConnectors().get(0);
+        DataConnector connector = sources.get(0).getConnector();
         Rule rule = m.getEvaluateRule().getRules().get(0);
         assertEquals(m.getSinksList().size(), 2);
         assertEquals(sources.get(0).isBaseline(), true);
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
index 563210d..6d9f053 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityMocksHelper.java
@@ -82,9 +82,9 @@
         DataConnector dcTarget)
         throws Exception {
         DataSource dataSource = new DataSource(
-            "source", true, createCheckpointMap(), Arrays.asList(dcSource));
+            "source", true, createCheckpointMap(), dcSource);
         DataSource targetSource = new DataSource(
-            "target", false, createCheckpointMap(), Arrays.asList(dcTarget));
+            "target", false, createCheckpointMap(), dcTarget);
         List<DataSource> dataSources = new ArrayList<>();
         dataSources.add(dataSource);
         dataSources.add(targetSource);
diff --git a/ui/angular/package.json b/ui/angular/package.json
index d4fae3d..39c6327 100644
--- a/ui/angular/package.json
+++ b/ui/angular/package.json
@@ -39,7 +39,7 @@
     "zone.js": "^0.8.14"
   },
   "devDependencies": {
-    "@angular/cli": "1.3.0",
+    "@angular/cli": "1.7.4",
     "@angular/compiler-cli": "4.4.4",
     "@angular/language-service": "4.4.4",
     "@types/jasmine": "~2.5.53",
diff --git a/ui/angular/src/app/job/create-job/batch/batch.component.ts b/ui/angular/src/app/job/create-job/batch/batch.component.ts
index 150f307..9049761 100644
--- a/ui/angular/src/app/job/create-job/batch/batch.component.ts
+++ b/ui/angular/src/app/job/create-job/batch/batch.component.ts
@@ -268,23 +268,21 @@
       if (measure == map.name) {
         var source = map["data.sources"];
         for (let i = 0; i < source.length; i++) {
-          var details = source[i].connectors;
-          for (let j = 0; j < details.length; j++) {
-            if (details[j]["data.unit"] != undefined) {
-              var table =
-                details[j].config.database +
-                "." +
-                details[j].config["table.name"];
-              var size = details[j]["data.unit"];
-              var connectorname = details[j]["name"];
-              var detail = {
-                id: i + 1,
-                name: table,
-                size: size,
-                connectorname: connectorname
-              };
-              this.dropdownList.push(detail);
-            }
+          var connector = source[i].connector;
+          if (connector["data.unit"] != undefined) {
+            var table =
+              connector.config.database +
+              "." +
+              connector.config["table.name"];
+            var size = connector["data.unit"];
+            var connectorname = connector["name"];
+            var detail = {
+              id: i + 1,
+              name: table,
+              size: size,
+              connectorname: connectorname
+            };
+            this.dropdownList.push(detail);
           }
         }
       }
diff --git a/ui/angular/src/app/job/create-job/streaming/streaming.component.ts b/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
index 200d788..808631b 100644
--- a/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
+++ b/ui/angular/src/app/job/create-job/streaming/streaming.component.ts
@@ -234,23 +234,21 @@
       if (measure == map.name) {
         var source = map["data.sources"];
         for (let i = 0; i < source.length; i++) {
-          var details = source[i].connectors;
-          for (let j = 0; j < details.length; j++) {
-            if (details[j]["data.unit"] != undefined) {
-              var table =
-                details[j].config.database +
-                "." +
-                details[j].config["table.name"];
-              var size = details[j]["data.unit"];
-              var connectorname = details[j]["name"];
-              var detail = {
-                id: i + 1,
-                name: table,
-                size: size,
-                connectorname: connectorname
-              };
-              this.dropdownList.push(detail);
-            }
+          var connector = source[i].connector;
+          if (connector["data.unit"] != undefined) {
+            var table =
+              connector.config.database +
+              "." +
+              connector.config["table.name"];
+            var size = connector["data.unit"];
+            var connectorname = connector["name"];
+            var detail = {
+              id: i + 1,
+              name: table,
+              size: size,
+              connectorname: connectorname
+            };
+            this.dropdownList.push(detail);
           }
         }
       }
diff --git a/ui/angular/src/app/job/job-detail/job-detail.component.ts b/ui/angular/src/app/job/job-detail/job-detail.component.ts
index 7c4a10b..d857b26 100644
--- a/ui/angular/src/app/job/job-detail/job-detail.component.ts
+++ b/ui/angular/src/app/job/job-detail/job-detail.component.ts
@@ -57,7 +57,7 @@
         this.measureType = this.measureData["dq.type"].toLowerCase();
         this.processType = this.measureData["process.type"].toLowerCase();
         for (let item of this.measureData["data.sources"]) {
-          let config = item.connectors[0].config;
+          let config = item.connector.config;
           let tableName = config.database + "." + config["table.name"];
           this.tableInfo.push(tableName);
         }
diff --git a/ui/angular/src/app/measure/create-measure/ac/ac.component.ts b/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
index 72d0138..8b7bbec 100644
--- a/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
+++ b/ui/angular/src/app/measure/create-measure/ac/ac.component.ts
@@ -148,7 +148,7 @@
     "data.sources": [
       {
         name: "source",
-        connectors: [
+        connector:
           {
             name: "",
             type: "HIVE",
@@ -170,11 +170,10 @@
               }
             ]
           }
-        ]
       },
       {
         name: "target",
-        connectors: [
+        connector:
           {
             name: "",
             type: "HIVE",
@@ -196,7 +195,6 @@
               }
             ]
           }
-        ]
       }
     ],
 
@@ -393,7 +391,7 @@
       "data.sources": [
         {
           name: "source",
-          connectors: [
+          connector:
             {
               name: this.src_name,
               type: "HIVE",
@@ -415,11 +413,10 @@
                 }
               ]
             }
-          ]
         },
         {
           name: "target",
-          connectors: [
+          connector:
             {
               name: this.tgt_name,
               type: "HIVE",
@@ -441,7 +438,6 @@
                 }
               ]
             }
-          ]
         }
       ],
       "evaluate.rule": {
@@ -499,11 +495,11 @@
   }
 
   deleteUnit(index) {
-    delete this.newMeasure["data.sources"][index]["connectors"][0]["data.unit"];
+    delete this.newMeasure["data.sources"][index]["connector"]["data.unit"];
   }
 
   deletePredicates(index) {
-    delete this.newMeasure["data.sources"][index]["connectors"][0]["predicates"];
+    delete this.newMeasure["data.sources"][index]["connector"]["predicates"];
   }
 
   save() {
diff --git a/ui/angular/src/app/measure/create-measure/pr/pr.component.ts b/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
index a1dae5e..57be58f 100644
--- a/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
+++ b/ui/angular/src/app/measure/create-measure/pr/pr.component.ts
@@ -360,7 +360,7 @@
       "data.sources": [
         {
           name: "source",
-          connectors: [
+          connector:
             {
               name: this.step1.srcname,
               type: "HIVE",
@@ -382,7 +382,6 @@
                 }
               ]
             }
-          ]
         }
       ],
       "evaluate.rule": {
@@ -393,10 +392,10 @@
 
     this.getGrouprule();
     if (this.step3.size.indexOf("0") == 0) {
-      delete this.newMeasure["data.sources"][0]["connectors"][0]["data.unit"];
+      delete this.newMeasure["data.sources"][0]["connector"]["data.unit"];
     }
     if (!this.step3.needpath || this.step3.path == "") {
-      delete this.newMeasure["data.sources"][0]["connectors"][0]["predicates"];
+      delete this.newMeasure["data.sources"][0]["connector"]["predicates"];
     }
     this.visible = true;
     setTimeout(() => (this.visibleAnimate = true), 100);
diff --git a/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts b/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
index 6fd2f25..08d9dc4 100644
--- a/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
+++ b/ui/angular/src/app/measure/measure-detail/measure-detail.component.spec.ts
@@ -43,7 +43,7 @@
   });
 
   it(
-    'should be created', 
+    'should be created',
     inject(
       [HttpTestingController, ServiceService],
       (httpMock: HttpTestingController, serviceService: ServiceService) => {
@@ -54,13 +54,13 @@
           "dq.type": "",
           "evaluate.rule": "",
           "data.sources": [{
-            "connectors": [{
+            "connector": {
               "data.unit": "",
               config: {
                 where: ""
               },
               predicates: []
-            }]
+            }
           }]
         });
 
diff --git a/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts b/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
index 5ad5fae..7413af1 100644
--- a/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
+++ b/ui/angular/src/app/measure/measure-detail/measure-detail.component.ts
@@ -64,7 +64,7 @@
   currentrule: string;
 
   fetchData(value, index) {
-    var data = this.ruleData["data.sources"][index].connectors[0];
+    var data = this.ruleData["data.sources"][index].connector;
     var size = value + "size";
     var zone = value + "zone";
     var where = value + "where";
diff --git a/ui/angular/src/app/measure/measure.component.ts b/ui/angular/src/app/measure/measure.component.ts
index 23f7a15..d94dc3c 100644
--- a/ui/angular/src/app/measure/measure.component.ts
+++ b/ui/angular/src/app/measure/measure.component.ts
@@ -72,11 +72,11 @@
     this.deletedRow = row;
     $("#save").removeAttr("disabled");
     if (this.deletedRow["measure.type"] !== "external") {
-      var sourcedata = this.deletedRow["data.sources"][0].connectors[0].config;
+      var sourcedata = this.deletedRow["data.sources"][0].connector.config;
       this.sourceTable = sourcedata["table.name"];
     }
     if (this.deletedRow["dq.type"] === "accuracy") {
-      var targetdata = this.deletedRow["data.sources"][1].connectors[0].config;
+      var targetdata = this.deletedRow["data.sources"][1].connector.config;
       this.targetTable = targetdata["table.name"];
     } else {
       this.targetTable = "";
diff --git a/ui/pom.xml b/ui/pom.xml
index d42cb6a..465da85 100644
--- a/ui/pom.xml
+++ b/ui/pom.xml
@@ -24,7 +24,7 @@
     <parent>
         <groupId>org.apache.griffin</groupId>
         <artifactId>griffin</artifactId>
-        <version>0.6.0-SNAPSHOT</version>
+        <version>0.7.0-SNAPSHOT</version>
     </parent>
     <artifactId>ui</artifactId>
     <packaging>pom</packaging>