[GRIFFIN-323] Refactor configuration Data Source Connector

**What changes were proposed in this pull request?**

This ticket proposes the following changes,

- remove 'version' from 'DataConnectorParam' as it is not being used anywhere in the codebase.
- change 'connectors' from array type to a single JSON object. Since a data source named X may only be of one type (hive, file etc), the connector field should not be an array.
- rename connectors to connector
- update existing config files and documentation for reference

**Does this PR introduce any user-facing change?**
Yes. As mentioned above, the config structure has changed now.

**How was this patch tested?**
Griffin test suite.

Author: chitralverma <chitralverma@gmail.com>

Closes #568 from chitralverma/refactor-data-connector-config.
diff --git a/griffin-doc/measure/measure-batch-sample.md b/griffin-doc/measure/measure-batch-sample.md
index f45d040..1aee173 100644
--- a/griffin-doc/measure/measure-batch-sample.md
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -24,36 +24,30 @@
 ```
 {
   "name": "accu_batch",
-
   "process.type": "BATCH",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "AVRO",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "AVRO",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
-    }, {
+      }
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "AVRO",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
-          }
+      "connector": {
+        "type": "AVRO",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_target.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -76,13 +70,15 @@
           {
             "type": "record",
             "name": "missRecords"
-          }        
-        ]        
+          }
+        ]
       }
     ]
   },
-  
-  "sinks": ["CONSOLE", "ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
 ```
 Above is the configure file of batch accuracy job.  
@@ -99,25 +95,20 @@
 ```
 {
   "name": "prof_batch",
-
   "process.type": "BATCH",
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "HIVE",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "src"
-          }
+      "connector": {
+        "type": "HIVE",
+        "version": "1.2",
+        "config": {
+          "database": "default",
+          "table.name": "src"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -129,8 +120,8 @@
           {
             "type": "metric",
             "name": "prof"
-          }        
-        ]        
+          }
+        ]
       },
       {
         "dsl.type": "griffin-dsl",
@@ -142,13 +133,15 @@
             "type": "metric",
             "name": "name_grp",
             "flatten": "array"
-          }        
+          }
         ]
       }
     ]
   },
-   
-  "sinks": ["CONSOLE", "ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
 ```
 Above is the configure file of batch profiling job.  
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index 4d7594c..e286ab1 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -120,37 +120,31 @@
 ```
 {
   "name": "accu_batch",
-
   "process.type": "BATCH",
-
   "data.sources": [
     {
       "name": "src",
-      "connectors": [
-        {
-          "type": "AVRO",
-          "version": "1.7",
-          "config": {
-          	"file.path": "<path>/<to>",
-            "file.name": "<source-file>.avro"
-          }
+      "connector": {
+        "type": "AVRO",
+        "version": "1.7",
+        "config": {
+          "file.path": "<path>/<to>",
+          "file.name": "<source-file>.avro"
         }
-      ]
-    }, {
+      }
+    },
+    {
       "name": "tgt",
-      "connectors": [
-        {
-          "type": "AVRO",
-          "version": "1.7",
-          "config": {
-          	"file.path": "<path>/<to>",
-            "file.name": "<target-file>.avro"
-          }
+      "connector": {
+        "type": "AVRO",
+        "version": "1.7",
+        "config": {
+          "file.path": "<path>/<to>",
+          "file.name": "<target-file>.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -164,7 +158,7 @@
           "miss": "miss_count",
           "total": "total_count",
           "matched": "matched_count"
-        },        
+        },
         "out": [
           {
             "type": "metric",
@@ -172,13 +166,16 @@
           },
           {
             "type": "record"
-          }        
+          }
         ]
       }
     ]
   },
-  
-  "sinks": ["CONSOLE", "HTTP", "HDFS"]
+  "sinks": [
+    "CONSOLE",
+    "HTTP",
+    "HDFS"
+  ]
 }
 ```
 Above lists DQ job configure parameters.  
@@ -187,7 +184,7 @@
 - **process.type**: Process type of DQ job, "BATCH" or "STREAMING".
 - **data.sources**: List of data sources in this DQ job.
 	+ name: Name of this data source, it should be different from other data sources.
-	+ connectors: List of data connectors combined as the same data source. Details of data connector configuration [here](#data-connector).
+	+ connector: Data connector for this data source. Details of data connector configuration [here](#data-connector).
 - **evaluate.rule**: Evaluate rule parameters of this DQ job.
 	+ dsl.type: Default dsl type of all the rules.
 	+ rules: List of rules, to define every rule step. Details of rule configuration [here](#rule).
@@ -195,7 +192,7 @@
 
 ### Data Connector
 
-Data Connectors help connector to external sources on which DQ checks can be applied.
+Data Connector help connect to external sources on which DQ checks can be applied.
 
 List of supported data connectors:
  - Hive
@@ -208,8 +205,7 @@
  A sample data connector configuration is as following,
  
  ```
-"connectors": [
-  {
+"connector": {
     "type": "file",
     "version": "1.7",
     "config": {
@@ -217,7 +213,6 @@
       "key2": "value2"
     }
   }
-]
  ```
 
  ##### Key Parameters:
@@ -234,15 +229,13 @@
     + For **Streaming** it should implement StreamingDataConnector trait.
  - Example:
       ```
-     "connectors": [
-       {
+     "connector": {
          "type": "custom",
          "config": {
            "class": "org.apache.griffin.measure.datasource.connector.batch.CassandraDataConnector",
            ...
          }
        }
-     ]
       ```
  
  **Note:** User-provided data connector should be present in Spark job's class path, by either providing custom jar with 
@@ -320,20 +313,32 @@
 
  - Example:
       ```
-     "connectors": [
-       {
+     "connector": {
          "type": "file",
          "config": {
            "format": "csv",
-           "paths": ["/path/to/csv/dir/*", "/path/to/dir/test.csv"],
+           "paths": [
+             "/path/to/csv/dir/*",
+             "/path/to/dir/test.csv"
+           ],
            "options": {
-                "header": "true"
-            },
+             "header": "true"
+           },
            "skipOnError": "false",
-           "schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
+           "schema": [
+             {
+               "name": "user_id",
+               "type": "string",
+               "nullable": "true"
+             },
+             {
+               "name": "age",
+               "type": "int",
+               "nullable": "false"
+             }
+           ]
          }
        }
-     ]
    
  **Note:** Additional examples of schema:
 - "schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
@@ -362,8 +367,7 @@
 
 - Example:
    ```
-  "connectors": [
-    {
+  "connector": {
       "type": "jdbc",
       "config": {
         "database": "default",
@@ -374,8 +378,7 @@
         "driver": "com.mysql.jdbc.Driver",
         "where": ""
       }
-    }
-  ]   
+    } 
   
 **Note:** Jar containing driver class should be present in Spark job's class path, by either providing custom jar with 
 `--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.  
diff --git a/griffin-doc/measure/measure-streaming-sample.md b/griffin-doc/measure/measure-streaming-sample.md
index 9e47143..b942778 100644
--- a/griffin-doc/measure/measure-streaming-sample.md
+++ b/griffin-doc/measure/measure-streaming-sample.md
@@ -24,93 +24,93 @@
 ```
 {
   "name": "accu_streaming",
-
   "process.type": "STREAMING",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "KAFKA",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "src_group",
-              "auto.offset.reset": "largest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "KAFKA",
+        "version": "0.8",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "src_group",
+            "auto.offset.reset": "largest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "sss",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "cache": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"],
+        "time.range": [
+          "-2m",
+          "0"
+        ],
         "updatable": true
       }
-    }, {
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "KAFKA",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "tgt_group",
-              "auto.offset.reset": "largest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "KAFKA",
+        "version": "0.8",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "tgt_group",
+            "auto.offset.reset": "largest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "t1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from t1"
-            }
-          ]
-        }
-      ],
+          "topics": "ttt",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "t1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from t1"
+          }
+        ]
+      },
       "cache": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/target",
         "info.path": "target",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
+        "time.range": [
+          "-2m",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -132,14 +132,16 @@
           },
           {
             "type": "record",
-            "name": "missRecords",
-          }        
+            "name": "missRecords"
+          }
         ]
       }
     ]
   },
-   
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
 ```
 Above is the configure file of streaming accuracy job.  
@@ -175,52 +177,50 @@
 ```
 {
   "name": "prof_streaming",
-
   "process.type": "STREAMING",
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "KAFKA",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "KAFKA",
+        "version": "0.8",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "group1",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "sss",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "cache": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": [
+          "0",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -232,8 +232,8 @@
           {
             "type": "metric",
             "name": "prof"
-          }        
-        ]        
+          }
+        ]
       },
       {
         "dsl.type": "griffin-dsl",
@@ -245,13 +245,15 @@
             "type": "metric",
             "name": "name_group",
             "flatten": "array"
-          }        
-        ]        
+          }
+        ]
       }
     ]
   },
-      
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
 ```
 Above is the configure file of streaming profiling job.  
diff --git a/griffin-doc/measure/predicates.md b/griffin-doc/measure/predicates.md
index 389edbe..87f1a0d 100644
--- a/griffin-doc/measure/predicates.md
+++ b/griffin-doc/measure/predicates.md
@@ -31,7 +31,7 @@
     ...
      "data.sources": [
         ...
-         "connectors": [
+         "connector": {
                    "predicates": [
                        {
                          "type": "file.exist",
@@ -42,8 +42,7 @@
                        }
                    ],
          ...
-         
-     ]
+         }
 }
 ```
 
diff --git a/griffin-doc/service/api-guide.md b/griffin-doc/service/api-guide.md
index 26435d6..482dbd3 100644
--- a/griffin-doc/service/api-guide.md
+++ b/griffin-doc/service/api-guide.md
@@ -163,137 +163,131 @@
 curl -k -H "Content-Type: application/json" -H "Accept: application/json" \
 -X POST http://127.0.0.1:8080/api/v1/measures \
 -d '{
-    "name":"profiling_measure",
-    "measure.type":"griffin",
-    "dq.type":"PROFILING",
-    "rule.description":{
-        "details":[
-            {
-                "name":"age",
-                "infos":"Total Count,Average"
-            }
+      "name": "profiling_measure",
+      "measure.type": "griffin",
+      "dq.type": "PROFILING",
+      "rule.description": {
+        "details": [
+          {
+            "name": "age",
+            "infos": "Total Count,Average"
+          }
         ]
-    },
-    "process.type":"BATCH",
-    "owner":"test",
-    "description":"measure description",
-    "data.sources":[
+      },
+      "process.type": "BATCH",
+      "owner": "test",
+      "description": "measure description",
+      "data.sources": [
         {
-            "name":"source",
-            "connectors":[
-                {
-                    "name":"connector_name",
-                    "type":"HIVE",
-                    "version":"1.2",
-                    "data.unit":"1hour",
-                    "data.time.zone":"UTC(WET,GMT)",
-                    "config":{
-                        "database":"default",
-                        "table.name":"demo_src",
-                        "where":"dt=#YYYYMMdd# AND hour=#HH#"
-                    },
-                    "predicates":[
-                        {
-                            "type":"file.exist",
-                            "config":{
-                                "root.path":"hdfs:///griffin/demo_src",
-                                "path":"/dt=#YYYYMMdd#/hour=#HH#/_DONE"
-                            }
-                        }
-                    ]
+          "name": "source",
+          "connector": {
+            "name": "connector_name",
+            "type": "HIVE",
+            "version": "1.2",
+            "data.unit": "1hour",
+            "data.time.zone": "UTC(WET,GMT)",
+            "config": {
+              "database": "default",
+              "table.name": "demo_src",
+              "where": "dt=#YYYYMMdd# AND hour=#HH#"
+            },
+            "predicates": [
+              {
+                "type": "file.exist",
+                "config": {
+                  "root.path": "hdfs:///griffin/demo_src",
+                  "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
                 }
+              }
             ]
+          }
         }
-    ],
-    "evaluate.rule":{
-        "rules":[
-            {
-                "dsl.type":"griffin-dsl",
-                "dq.type":"PROFILING",
-                "rule":"count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
-                "name":"profiling",
-                "details":{}
-            }
+      ],
+      "evaluate.rule": {
+        "rules": [
+          {
+            "dsl.type": "griffin-dsl",
+            "dq.type": "PROFILING",
+            "rule": "count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
+            "name": "profiling",
+            "details": {}
+          }
         ]
-    }
-}'
+      }
+    }'
 ```
 Here is an example to define measure of accuracy:
 ```
 curl -k -H "Content-Type: application/json" -H "Accept: application/json" \
 -X POST http://127.0.0.1:8080/api/v1/measures \
 -d '{
-    "name":"accuracy_measure",
-    "measure.type":"griffin",
-    "dq.type":"ACCURACY",
-    "process.type":"BATCH",
-    "owner":"test",
-    "description":"measure description",
-    "data.sources":[
+      "name": "accuracy_measure",
+      "measure.type": "griffin",
+      "dq.type": "ACCURACY",
+      "process.type": "BATCH",
+      "owner": "test",
+      "description": "measure description",
+      "data.sources": [
         {
-            "name":"source",
-            "connectors":[
-                {
-                    "name":"connector_name_source",
-                    "type":"HIVE",
-                    "version":"1.2",
-                    "data.unit":"1hour",
-                    "data.time.zone":"UTC(WET,GMT)",
-                    "config":{
-                        "database":"default",
-                        "table.name":"demo_src",
-                        "where":"dt=#YYYYMMdd# AND hour=#HH#"
-                    },
-                    "predicates":[
-                        {
-                            "type":"file.exist",
-                            "config":{
-                                "root.path":"hdfs:///griffin/demo_src",
-                                "path":"/dt=#YYYYMMdd#/hour=#HH#/_DONE"
-                            }
-                        }
-                    ]
+          "name": "source",
+          "connector": {
+            "name": "connector_name_source",
+            "type": "HIVE",
+            "version": "1.2",
+            "data.unit": "1hour",
+            "data.time.zone": "UTC(WET,GMT)",
+            "config": {
+              "database": "default",
+              "table.name": "demo_src",
+              "where": "dt=#YYYYMMdd# AND hour=#HH#"
+            },
+            "predicates": [
+              {
+                "type": "file.exist",
+                "config": {
+                  "root.path": "hdfs:///griffin/demo_src",
+                  "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
                 }
+              }
             ]
+          }
         },
         {
-            "name":"target",
-            "connectors":[
-                {
-                    "name":"connector_name_target",
-                    "type":"HIVE",
-                    "version":"1.2",
-                    "data.unit":"1hour",
-                    "data.time.zone":"UTC(WET,GMT)",
-                    "config":{
-                        "database":"default",
-                        "table.name":"demo_tgt",
-                        "where":"dt=#YYYYMMdd# AND hour=#HH#"
-                    },
-                    "predicates":[
-                        {
-                            "type":"file.exist",
-                            "config":{
-                                "root.path":"hdfs:///griffin/demo_src",
-                                "path":"/dt=#YYYYMMdd#/hour=#HH#/_DONE"
-                            }
-                        }
-                    ]
+          "name": "target",
+          "connector": {
+            "name": "connector_name_target",
+            "type": "HIVE",
+            "version": "1.2",
+            "data.unit": "1hour",
+            "data.time.zone": "UTC(WET,GMT)",
+            "config": {
+              "database": "default",
+              "table.name": "demo_tgt",
+              "where": "dt=#YYYYMMdd# AND hour=#HH#"
+            },
+            "predicates": [
+              {
+                "type": "file.exist",
+                "config": {
+                  "root.path": "hdfs:///griffin/demo_src",
+                  "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
                 }
+              }
             ]
+          }
         }
-    ],
-    "evaluate.rule":{
-        "rules":[
-            {
-                "dsl.type":"griffin-dsl",
-                "dq.type":"ACCURACY",
-                "name":"accuracy",
-                "rule":"source.desc=target.desc"
-            }
+      ],
+      "evaluate.rule": {
+        "rules": [
+          {
+            "dsl.type": "griffin-dsl",
+            "dq.type": "ACCURACY",
+            "name": "accuracy",
+            "rule": "source.desc=target.desc"
+          }
         ]
-    }
-}'
+      }
+    }'
 ```
 Here is an example to define external measure:
 ```
@@ -319,84 +313,90 @@
 #### API Example
 ```bash
 curl -k -H "Accept: application/json" -X GET http://127.0.0.1:8080/api/v1/measures
-[{
-        "measure.type": "griffin",
-        "id": 1,
-        "name": "accuracy_measure",
-        "owner": "test",
-        "description": "measure description",
-        "deleted": false,
-        "dq.type": "ACCURACY",
-        "sinks": ["ELASTICSEARCH", "HDFS"],
-        "process.type": "BATCH",
-        "data.sources": [{
-                "id": 4,
-                "name": "source",
-                "connectors": [{
-                        "id": 5,
-                        "name": "connector_name_source",
-                        "type": "HIVE",
-                        "version": "1.2",
-                        "predicates": [{
-                                "id": 6,
-                                "type": "file.exist",
-                                "config": {
-                                    "root.path": "hdfs:///127.0.0.1/demo_src",
-                                    "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
-                                }
-                            }
-                        ],
-                        "data.unit": "1hour",
-                        "data.time.zone": "UTC(WET,GMT)",
-                        "config": {
-                            "database": "default",
-                            "table.name": "demo_src",
-                            "where": "dt=#YYYYMMdd# AND hour=#HH#"
-                        }
-                    }
-                ],
-                "baseline": false
-            }, {
-                "id": 7,
-                "name": "target",
-                "connectors": [{
-                        "id": 8,
-                        "name": "connector_name_target",
-                        "type": "HIVE",
-                        "version": "1.2",
-                        "predicates": [{
-                                "id": 9,
-                                "type": "file.exist",
-                                "config": {
-                                    "root.path": "hdfs:///127.0.0.1/demo_src",
-                                    "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
-                                }
-                            }
-                        ],
-                        "data.unit": "1hour",
-                        "data.time.zone": "UTC(WET,GMT)",
-                        "config": {
-                            "database": "default",
-                            "table.name": "demo_tgt",
-                            "where": "dt=#YYYYMMdd# AND hour=#HH#"
-                        }
-                    }
-                ],
-                "baseline": false
+[
+  {
+    "measure.type": "griffin",
+    "id": 1,
+    "name": "accuracy_measure",
+    "owner": "test",
+    "description": "measure description",
+    "deleted": false,
+    "dq.type": "ACCURACY",
+    "sinks": [
+      "ELASTICSEARCH",
+      "HDFS"
+    ],
+    "process.type": "BATCH",
+    "data.sources": [
+      {
+        "id": 4,
+        "name": "source",
+        "connector": {
+          "id": 5,
+          "name": "connector_name_source",
+          "type": "HIVE",
+          "version": "1.2",
+          "predicates": [
+            {
+              "id": 6,
+              "type": "file.exist",
+              "config": {
+                "root.path": "hdfs:///127.0.0.1/demo_src",
+                "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
+              }
             }
-        ],
-        "evaluate.rule": {
-            "id": 2,
-            "rules": [{
-                    "id": 3,
-                    "rule": "source.desc=target.desc",
-                    "dsl.type": "griffin-dsl",
-                    "dq.type": "ACCURACY"
-                }
-            ]
+          ],
+          "data.unit": "1hour",
+          "data.time.zone": "UTC(WET,GMT)",
+          "config": {
+            "database": "default",
+            "table.name": "demo_src",
+            "where": "dt=#YYYYMMdd# AND hour=#HH#"
+          }
         },
-        "measure.type": "griffin"
+        "baseline": false
+      },
+      {
+        "id": 7,
+        "name": "target",
+        "connector": {
+          "id": 8,
+          "name": "connector_name_target",
+          "type": "HIVE",
+          "version": "1.2",
+          "predicates": [
+            {
+              "id": 9,
+              "type": "file.exist",
+              "config": {
+                "root.path": "hdfs:///127.0.0.1/demo_src",
+                "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
+              }
+            }
+          ],
+          "data.unit": "1hour",
+          "data.time.zone": "UTC(WET,GMT)",
+          "config": {
+            "database": "default",
+            "table.name": "demo_tgt",
+            "where": "dt=#YYYYMMdd# AND hour=#HH#"
+          }
+        },
+        "baseline": false
+      }
+    ],
+    "evaluate.rule": {
+      "id": 2,
+      "rules": [
+        {
+          "id": 3,
+          "rule": "source.desc=target.desc",
+          "dsl.type": "griffin-dsl",
+          "dq.type": "ACCURACY"
+        }
+      ]
     }
+  }
 ]
 ```
 
@@ -422,64 +422,69 @@
 curl -k -H "Content-Type: application/json" -H "Accept: application/json" \
 -X PUT http://127.0.0.1:8080/api/v1/measures \
 -d '{
-    "measure.type": "griffin",
-    "id": 19,
-    "name": "profiling_measure_edited",
-    "owner": "test",
-    "description": "measure description",
-    "deleted": false,
-    "dq.type": "PROFILING",
-    "sinks": ["ELASTICSEARCH", "HDFS"],
-    "process.type": "BATCH",
-    "rule.description": {
-        "details": [{
-                "name": "age",
-                "infos": "Total Count,Average"
-            }
+      "measure.type": "griffin",
+      "id": 19,
+      "name": "profiling_measure_edited",
+      "owner": "test",
+      "description": "measure description",
+      "deleted": false,
+      "dq.type": "PROFILING",
+      "sinks": [
+        "ELASTICSEARCH",
+        "HDFS"
+      ],
+      "process.type": "BATCH",
+      "rule.description": {
+        "details": [
+          {
+            "name": "age",
+            "infos": "Total Count,Average"
+          }
         ]
-    },
-    "data.sources": [{
-            "id": 22,
-            "name": "source",
-            "connectors": [{
-                    "id": 23,
-                    "name": "connector_name",
-                    "type": "HIVE",
-                    "version": "1.2",
-                    "predicates": [{
-                            "id": 24,
-                            "type": "file.exist",
-                            "config": {
-                                "root.path": "hdfs:///griffin/demo_src",
-                                "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
-                            }
-                        }
-                    ],
-                    "data.unit": "1hour",
-                    "data.time.zone": "UTC(WET,GMT)",
-                    "config": {
-                        "database": "default",
-                        "table.name": "demo_src",
-                        "where": "dt=#YYYYMMdd# AND hour=#HH#"
-                    }
+      },
+      "data.sources": [
+        {
+          "id": 22,
+          "name": "source",
+          "connector": {
+            "id": 23,
+            "name": "connector_name",
+            "type": "HIVE",
+            "version": "1.2",
+            "predicates": [
+              {
+                "id": 24,
+                "type": "file.exist",
+                "config": {
+                  "root.path": "hdfs:///griffin/demo_src",
+                  "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
                 }
+              }
             ],
-            "baseline": false
-        }
-    ],
-    "evaluate.rule": {
-        "id": 20,
-        "rules": [{
-                "id": 21,
-                "rule": "count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
-                "dsl.type": "griffin-dsl",
-                "dq.type": "PROFILING",
-                "details": {}
+            "data.unit": "1hour",
+            "data.time.zone": "UTC(WET,GMT)",
+            "config": {
+              "database": "default",
+              "table.name": "demo_src",
+              "where": "dt=#YYYYMMdd# AND hour=#HH#"
             }
+          },
+          "baseline": false
+        }
+      ],
+      "evaluate.rule": {
+        "id": 20,
+        "rules": [
+          {
+            "id": 21,
+            "rule": "count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
+            "dsl.type": "griffin-dsl",
+            "dq.type": "PROFILING",
+            "details": {}
+          }
         ]
-    },
-    "measure.type": "griffin"
-}'
+      }
+    }'
 ```
 Here is an example to update external measure:
 ```
diff --git a/measure/src/main/resources/config-batch-advanced.json b/measure/src/main/resources/config-batch-advanced.json
index 9e98efa..c6740e9 100644
--- a/measure/src/main/resources/config-batch-advanced.json
+++ b/measure/src/main/resources/config-batch-advanced.json
@@ -1,35 +1,29 @@
 {
   "name": "accu_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
-    }, {
+      }
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_target.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -44,7 +38,7 @@
           "total": "total_count",
           "matched": "matched_count"
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "accu"
@@ -57,5 +51,8 @@
       }
     ]
   },
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
diff --git a/measure/src/main/resources/config-batch-path.json b/measure/src/main/resources/config-batch-path.json
index 6aab127..883a76b 100644
--- a/measure/src/main/resources/config-batch-path.json
+++ b/measure/src/main/resources/config-batch-path.json
@@ -1,35 +1,29 @@
 {
   "name": "accu_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.path": "measure/src/test/resources/users_info_src"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.path": "measure/src/test/resources/users_info_src"
         }
-      ]
-    }, {
+      }
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.path": "measure/src/test/resources/users_info_target"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.path": "measure/src/test/resources/users_info_target"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -40,5 +34,8 @@
       }
     ]
   },
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json
index 69ad485..d7bc337 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -1,35 +1,29 @@
 {
   "name": "accu_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "measure/src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "measure/src/test/resources/users_info_src.avro"
         }
-      ]
-    }, {
+      }
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "measure/src/test/resources/users_info_target.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "measure/src/test/resources/users_info_target.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -40,5 +34,8 @@
       }
     ]
   },
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json
index 1be7a0a..9828984 100644
--- a/measure/src/main/resources/config-streaming.json
+++ b/measure/src/main/resources/config-streaming.json
@@ -1,54 +1,52 @@
 {
   "name": "prof_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name" : "kafka",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "kafka",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.147.177.107:9092",
+            "group.id": "group1",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "kafka",
-              "out.dataframe.name": "out1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "in.dataframe.name":"out1",
-              "out.datafrmae.name": "out3",
-              "rule": "select name, age from out1"
-            }
-          ]
-        }
-      ],
+          "topics": "sss",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "kafka",
+            "out.dataframe.name": "out1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "in.dataframe.name": "out1",
+            "out.datafrmae.name": "out3",
+            "rule": "select name, age from out1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": [
+          "0",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -56,7 +54,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "prof",
         "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "prof"
@@ -68,7 +66,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "grp",
         "rule": "select name, count(*) as `cnt` from source group by name",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "name_group",
@@ -78,5 +76,7 @@
       }
     ]
   },
-  "sinks": ["ELASTICSEARCH"]
+  "sinks": [
+    "ELASTICSEARCH"
+  ]
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 993f432..9264185 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -21,13 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
 
-import org.apache.griffin.measure.configuration.enums.{
-  DqType,
-  DslType,
-  FlattenType,
-  OutputType,
-  SinkType
-}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.enums.DqType._
 import org.apache.griffin.measure.configuration.enums.DslType.{DslType, GriffinDsl}
 import org.apache.griffin.measure.configuration.enums.FlattenType.{
@@ -86,31 +80,31 @@
  * data source param
  * @param name         data source name (must)
  * @param baseline     data source is baseline or not, false by default (optional)
- * @param connectors   data connectors (optional)
+ * @param connector   data connector (optional)
  * @param checkpoint   data source checkpoint configuration (must in streaming mode with streaming connectors)
  */
 @JsonInclude(Include.NON_NULL)
 case class DataSourceParam(
     @JsonProperty("name") private val name: String,
-    @JsonProperty("connectors") private val connectors: List[DataConnectorParam],
+    @JsonProperty("connector") private val connector: DataConnectorParam,
     @JsonProperty("baseline") private val baseline: Boolean = false,
     @JsonProperty("checkpoint") private val checkpoint: Map[String, Any] = null)
     extends Param {
   def getName: String = name
   def isBaseline: Boolean = if (Option(baseline).isDefined) baseline else false
-  def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
+  def getConnector: Option[DataConnectorParam] = Option(connector)
   def getCheckpointOpt: Option[Map[String, Any]] = Option(checkpoint)
 
   def validate(): Unit = {
     assert(StringUtils.isNotBlank(name), "data source name should not be empty")
-    getConnectors.foreach(_.validate())
+    assert(getConnector.isDefined, "Connector is undefined or invalid")
+    getConnector.foreach(_.validate())
   }
 }
 
 /**
  * data connector param
  * @param conType    data connector type, e.g.: hive, avro, kafka (must)
- * @param version    data connector type version (optional)
  * @param dataFrameName    data connector dataframe name, for pre-process input usage (optional)
  * @param config     detail configuration of data connector (must)
  * @param preProc    pre-process rules after load data (optional)
@@ -118,13 +112,11 @@
 @JsonInclude(Include.NON_NULL)
 case class DataConnectorParam(
     @JsonProperty("type") private val conType: String,
-    @JsonProperty("version") private val version: String,
     @JsonProperty("dataframe.name") private val dataFrameName: String,
     @JsonProperty("config") private val config: Map[String, Any],
     @JsonProperty("pre.proc") private val preProc: List[RuleParam])
     extends Param {
   def getType: String = conType
-  def getVersion: String = if (version != null) version else ""
   def getDataFrameName(defName: String): String =
     if (dataFrameName != null) dataFrameName else defName
   def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 872deb1..9eff4d0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -30,13 +30,13 @@
  * data source
  * @param name     name of data source
  * @param dsParam  param of this data source
- * @param dataConnectors       list of data connectors
+ * @param dataConnector       data connector
  * @param streamingCacheClientOpt   streaming data cache client option
  */
 case class DataSource(
     name: String,
     dsParam: DataSourceParam,
-    dataConnectors: Seq[DataConnector],
+    dataConnector: Option[DataConnector],
     streamingCacheClientOpt: Option[StreamingCacheClient])
     extends Loggable
     with Serializable {
@@ -44,7 +44,7 @@
   val isBaseline: Boolean = dsParam.isBaseline
 
   def init(): Unit = {
-    dataConnectors.foreach(_.init())
+    dataConnector.foreach(_.init())
   }
 
   def loadData(context: DQContext): TimeRange = {
@@ -67,7 +67,7 @@
   }
 
   private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
-    val batches = dataConnectors.flatMap { dc =>
+    val batches = dataConnector.flatMap { dc =>
       val (dfOpt, timeRange) = dc.data(timestamp)
       dfOpt match {
         case Some(_) => Some((dfOpt, timeRange))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 67d9544..7049063 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -25,7 +25,7 @@
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
 import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
-import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.datasource.connector.DataConnectorFactory
 
 object DataSourceFactory extends Loggable {
 
@@ -45,7 +45,6 @@
       dataSourceParam: DataSourceParam,
       index: Int): Option[DataSource] = {
     val name = dataSourceParam.getName
-    val connectorParams = dataSourceParam.getConnectors
     val timestampStorage = TimestampStorage()
 
     // for streaming data cache
@@ -56,19 +55,23 @@
       index,
       timestampStorage)
 
-    val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
-      DataConnectorFactory.getDataConnector(
-        sparkSession,
-        ssc,
-        connectorParam,
-        timestampStorage,
-        streamingCacheClientOpt) match {
-        case Success(connector) => Some(connector)
-        case _ => None
-      }
-    }
+    val connectorParamsOpt = dataSourceParam.getConnector
 
-    Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
+    connectorParamsOpt match {
+      case Some(connectorParam) =>
+        val dataConnectors = DataConnectorFactory.getDataConnector(
+          sparkSession,
+          ssc,
+          connectorParam,
+          timestampStorage,
+          streamingCacheClientOpt) match {
+          case Success(connector) => Some(connector)
+          case _ => None
+        }
+
+        Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
+      case None => None
+    }
   }
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
index baa5639..a6bc7cb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
@@ -31,11 +31,14 @@
 
   def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
     val name = getStepName(param.getName)
-    val steps = param.getConnectors.flatMap { dc =>
-      buildReadSteps(context, dc)
+
+    param.getConnector match {
+      case Some(dc) =>
+        val steps = buildReadSteps(context, dc)
+        if (steps.isDefined) Some(UnionReadStep(name, Seq(steps.get)))
+        else None
+      case _ => None
     }
-    if (steps.nonEmpty) Some(UnionReadStep(name, steps))
-    else None
   }
 
   protected def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): Option[ReadStep]
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json
index 149c839..19a49c3 100644
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -1,35 +1,27 @@
 {
   "name": "accu_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
-    }, {
+      }
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "config": {
+          "file.name": "src/test/resources/users_info_target.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -44,7 +36,7 @@
           "total": "total_count",
           "matched": "matched_count"
         },
-        "out":[
+        "out": [
           {
             "type": "record",
             "name": "missRecords"
@@ -53,6 +45,8 @@
       }
     ]
   },
-
-  "sinks": ["LOG", "ELASTICSEARCH"]
+  "sinks": [
+    "LOG",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index 9ce25df..c7c1095 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -1,96 +1,96 @@
 {
   "name": "accu_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.147.177.107:9092",
+            "group.id": "group1",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "sss",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "type": "parquet",
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"],
+        "time.range": [
+          "-2m",
+          "0"
+        ],
         "updatable": true
       }
-    }, {
+    },
+    {
       "name": "target",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.147.177.107:9092",
+            "group.id": "group1",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "t1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from t1"
-            }
-          ]
-        }
-      ],
+          "topics": "ttt",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "t1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from t1"
+          }
+        ]
+      },
       "checkpoint": {
         "type": "parquet",
         "file.path": "hdfs://localhost/griffin/streaming/dump/target",
         "info.path": "target",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
+        "time.range": [
+          "-2m",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -105,19 +105,21 @@
           "total": "total_count",
           "matched": "matched_count"
         },
-        "out":[
+        "out": [
           {
-            "type":"metric",
+            "type": "metric",
             "name": "accu"
           },
           {
-            "type":"record",
+            "type": "record",
             "name": "missRecords"
           }
         ]
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index 4f42092..c757624 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -1,25 +1,19 @@
 {
   "name": "comp_batch",
-
   "process.type": "batch",
-
   "timestamp": 123456,
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -27,7 +21,7 @@
         "dq.type": "completeness",
         "out.dataframe.name": "comp",
         "rule": "email, post_code, first_name",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "comp"
@@ -36,6 +30,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index 53e1e7b..114c12d 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -1,52 +1,50 @@
 {
   "name": "comp_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "source",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "test",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.147.177.107:9092",
+            "group.id": "source",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "test",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": [
+          "0",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -54,7 +52,7 @@
         "dq.type": "completeness",
         "out.dataframe.name": "comp",
         "rule": "name, age",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "comp"
@@ -63,6 +61,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
index e3b1f1c..cfe5326 100644
--- a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
@@ -1,17 +1,15 @@
 {
   "data.sources": [
     {
-      "connectors": [
-        {
-          "dataframe.name": "prof_table",
-          "config": {
-            "table.name": "efg",
-            "database": "abc",
-            "where": "`date`=\"20190825\""
-          },
-          "type": "hive"
-        }
-      ],
+      "connector": {
+        "dataframe.name": "prof_table",
+        "config": {
+          "table.name": "efg",
+          "database": "abc",
+          "where": "`date`=\"20190825\""
+        },
+        "type": "hive"
+      },
       "name": "source"
     }
   ],
@@ -26,16 +24,23 @@
         "out.dataframe.name": "prof",
         "dsl.type": "griffin-dsl",
         "dq.type": "completeness",
-        "error.confs":[
+        "error.confs": [
           {
             "column.name": "user",
             "type": "enumeration",
-            "values":["1", "2", "hive_none", ""]
+            "values": [
+              "1",
+              "2",
+              "hive_none",
+              ""
+            ]
           },
           {
             "column.name": "name",
             "type": "regex",
-            "values":["^zhanglei.natur\\w{1}$"]
+            "values": [
+              "^zhanglei.natur\\w{1}$"
+            ]
           }
         ],
         "out": [
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json
index d946089..bef2b50 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -1,37 +1,30 @@
 {
   "name": "dist_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
+      }
     },
     {
       "name": "target",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -48,7 +41,7 @@
           "num": "num",
           "duplication.array": "dup"
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "distinct"
@@ -57,6 +50,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
index e3629d1..88540e4 100644
--- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -1,8 +1,6 @@
 {
   "name": "dist_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "new",
@@ -11,53 +9,56 @@
         "info.path": "new",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"],
+        "time.range": [
+          "0",
+          "0"
+        ],
         "read.only": true
       }
     },
     {
       "name": "old",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "old",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "old",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "ttt",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/old",
         "info.path": "old",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-24h", "0"]
+        "time.range": [
+          "-24h",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -75,7 +76,7 @@
           "num": "num",
           "duplication.array": "dup"
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "distinct"
@@ -84,6 +85,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
index 70cc369..d7df301 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
@@ -1,26 +1,20 @@
 {
   "name": "prof_batch",
-
   "process.type": "batch",
-
   "timestamp": 123456,
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "hive",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "s1"
-          }
+      "connector": {
+        "type": "hive",
+        "version": "1.2",
+        "config": {
+          "database": "default",
+          "table.name": "s1"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -28,7 +22,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "prof",
         "rule": "name, count(*) as cnt from source group by name",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "name_group",
@@ -41,7 +35,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "grp",
         "rule": "age, count(*) as cnt from source group by age order by cnt",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "age_group",
@@ -51,6 +45,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json
index 273d2e4..e1df3da 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -1,32 +1,26 @@
 {
   "name": "prof_batch",
-
   "process.type": "batch",
-
   "timestamp": 123456,
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "dataframe.name" : "this_table",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "rule": "select * from this_table where user_id < 10014"
-            }
-          ]
-        }
-      ]
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "dataframe.name": "this_table",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "spark-sql",
+            "rule": "select * from this_table where user_id < 10014"
+          }
+        ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -34,7 +28,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "prof",
         "rule": "user_id, count(*) as cnt from source group by user_id",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "prof",
@@ -47,7 +41,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "grp",
         "rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "post_group",
@@ -57,6 +51,7 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE"]
+  "sinks": [
+    "CONSOLE"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json b/measure/src/test/resources/_profiling-batch-sparksql.json
index c8077d0..eaf0f89 100644
--- a/measure/src/test/resources/_profiling-batch-sparksql.json
+++ b/measure/src/test/resources/_profiling-batch-sparksql.json
@@ -1,25 +1,19 @@
 {
   "name": "prof_batch",
-
   "process.type": "batch",
-
   "timestamp": 123456,
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -47,6 +41,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index 1d28745..efe0929 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -1,52 +1,50 @@
 {
   "name": "prof_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "test",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.147.177.107:9092",
+            "group.id": "group1",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "test",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": [
+          "0",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -54,7 +52,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "prof",
         "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "prof"
@@ -66,7 +64,7 @@
         "dq.type": "profiling",
         "out.dataframe.name": "grp",
         "rule": "select name, count(*) as `cnt` from source group by name",
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "name_group",
@@ -76,6 +74,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index f3759ca..f9be0fa 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -1,23 +1,18 @@
 {
   "name": "timeliness_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/timeliness_data.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/timeliness_data.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -35,9 +30,11 @@
           "count": "cnt",
           "step.size": "2m",
           "percentile": "percentile",
-          "percentile.values": [0.95]
+          "percentile.values": [
+            0.95
+          ]
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "timeliness"
@@ -50,6 +47,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index 1663122..360ea88 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -1,52 +1,50 @@
 {
   "name": "timeliness_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "fff",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "group1",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "fff",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": [
+          "0",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -64,9 +62,13 @@
           "count": "cnt",
           "step.size": "5m",
           "percentile": "percentile",
-          "percentile.values": [0.2, 0.5, 0.8]
+          "percentile.values": [
+            0.2,
+            0.5,
+            0.8
+          ]
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "timeliness"
@@ -79,6 +81,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
index 2c32930..fe5eef5 100644
--- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
@@ -1,36 +1,29 @@
 {
   "name": "unique_batch",
-
   "process.type": "batch",
-
   "data.sources": [
     {
       "name": "source",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
+      }
     },
     {
       "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
+      "connector": {
+        "type": "avro",
+        "version": "1.7",
+        "config": {
+          "file.name": "src/test/resources/users_info_src.avro"
         }
-      ]
+      }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -46,7 +39,7 @@
           "dup": "dup",
           "num": "num"
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "unique"
@@ -59,6 +52,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
index a4f4dcc..7c57748 100644
--- a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
@@ -1,94 +1,93 @@
 {
   "name": "unique_streaming",
-
   "process.type": "streaming",
-
   "data.sources": [
     {
       "name": "new",
       "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "new",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "new",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "ttt",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/new",
         "info.path": "new",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": [
+          "0",
+          "0"
+        ]
       }
     },
     {
       "name": "old",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "dataframe.name": "this",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "old",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
+      "connector": {
+        "type": "kafka",
+        "version": "0.8",
+        "dataframe.name": "this",
+        "config": {
+          "kafka.config": {
+            "bootstrap.servers": "10.149.247.156:9092",
+            "group.id": "old",
+            "auto.offset.reset": "smallest",
+            "auto.commit.enable": "false"
           },
-          "pre.proc": [
-            {
-              "dsl.type": "df-ops",
-              "in.dataframe.name": "this",
-              "out.dataframe.name": "s1",
-              "rule": "from_json"
-            },
-            {
-              "dsl.type": "spark-sql",
-              "out.dataframe.name": "this",
-              "rule": "select name, age from s1"
-            }
-          ]
-        }
-      ],
+          "topics": "ttt",
+          "key.type": "java.lang.String",
+          "value.type": "java.lang.String"
+        },
+        "pre.proc": [
+          {
+            "dsl.type": "df-ops",
+            "in.dataframe.name": "this",
+            "out.dataframe.name": "s1",
+            "rule": "from_json"
+          },
+          {
+            "dsl.type": "spark-sql",
+            "out.dataframe.name": "this",
+            "rule": "select name, age from s1"
+          }
+        ]
+      },
       "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/old",
         "info.path": "old",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-24h", "0"]
+        "time.range": [
+          "-24h",
+          "0"
+        ]
       }
     }
   ],
-
   "evaluate.rule": {
     "rules": [
       {
@@ -105,7 +104,7 @@
           "num": "num",
           "duplication.array": "dup"
         },
-        "out":[
+        "out": [
           {
             "type": "metric",
             "name": "unique"
@@ -118,6 +117,8 @@
       }
     ]
   },
-
-  "sinks": ["CONSOLE","ELASTICSEARCH"]
+  "sinks": [
+    "CONSOLE",
+    "ELASTICSEARCH"
+  ]
 }
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 2537931..466196b 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -81,7 +81,6 @@
     val param = DataConnectorParam(
       "CUSTOM",
       null,
-      null,
       Map("class" -> classOf[ExampleBatchDataConnector].getCanonicalName),
       Nil)
     // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
@@ -96,7 +95,6 @@
     val param = DataConnectorParam(
       "CUSTOM",
       null,
-      null,
       Map("class" -> classOf[MySqlDataConnector].getCanonicalName),
       Nil)
     // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
@@ -109,7 +107,6 @@
     val param = DataConnectorParam(
       "CUSTOM",
       null,
-      null,
       Map("class" -> classOf[KafkaStreamingStringDataConnector].getCanonicalName),
       Nil)
     val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
@@ -121,7 +118,6 @@
     val param = DataConnectorParam(
       "CUSTOM",
       null,
-      null,
       Map("class" -> classOf[NotDataConnector].getCanonicalName),
       Nil)
     // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
@@ -138,7 +134,6 @@
     val param = DataConnectorParam(
       "CUSTOM",
       null,
-      null,
       Map("class" -> classOf[DataConnectorWithoutApply].getCanonicalName),
       Nil)
     // apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
index 4cf2028..3df026a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -55,7 +55,7 @@
   }
 
   private final val dcParam =
-    DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil)
+    DataConnectorParam("file", "test_df", Map.empty[String, String], Nil)
   private final val timestampStorage = TimestampStorage()
 
   // Regarding Local FileSystem
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index e707ef7..eec2e5e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -28,7 +28,7 @@
 class JDBCBasedDataConnectorTest extends SparkSuiteBase with Matchers {
 
   val url = "jdbc:h2:mem:test"
-  var conn: java.sql.Connection = null
+  var conn: java.sql.Connection = _
   val properties = new Properties()
   properties.setProperty("user", "user")
   properties.setProperty("password", "password")
@@ -51,7 +51,7 @@
   }
 
   private final val dcParam =
-    DataConnectorParam("jdbc", "1", "test_df", Map.empty[String, String], Nil)
+    DataConnectorParam("jdbc", "test_df", Map.empty[String, String], Nil)
   private final val timestampStorage = TimestampStorage()
 
   "JDBC based data connector" should "be able to read data from relational database" in {
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index ddfc95b..2f06ad3 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -82,12 +82,8 @@
       expectedResult: AccuracyResult) = {
     val dqContext: DQContext = getDqContext(
       dataSourcesParam = List(
-        DataSourceParam(
-          name = "source",
-          connectors = List(dataConnectorParam(tableName = sourceName))),
-        DataSourceParam(
-          name = "target",
-          connectors = List(dataConnectorParam(tableName = targetName)))))
+        DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
+        DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
 
     val accuracyRule = RuleParam(
       dslType = "griffin-dsl",
@@ -161,7 +157,6 @@
   private def dataConnectorParam(tableName: String) = {
     DataConnectorParam(
       conType = "HIVE",
-      version = null,
       dataFrameName = null,
       config = Map("table.name" -> tableName),
       preProc = null)