[GRIFFIN-323] Refactor configuration Data Source Connector
**What changes were proposed in this pull request?**
This ticket proposes the following changes,
- remove 'version' from 'DataConnectorParam' as it is not being used anywhere in the codebase.
- change 'connectors' from array type to a single JSON object. Since a data source named X may only be of one type (hive, file etc), the connector field should not be an array.
- rename connectors to connector
- update existing config files and documentation for reference
**Does this PR introduce any user-facing change?**
Yes. As mentioned above, the config structure has changed now.
**How was this patch tested?**
Griffin test suite.
Author: chitralverma <chitralverma@gmail.com>
Closes #568 from chitralverma/refactor-data-connector-config.
diff --git a/griffin-doc/measure/measure-batch-sample.md b/griffin-doc/measure/measure-batch-sample.md
index f45d040..1aee173 100644
--- a/griffin-doc/measure/measure-batch-sample.md
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -24,36 +24,30 @@
```
{
"name": "accu_batch",
-
"process.type": "BATCH",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "AVRO",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "AVRO",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
- }, {
+ }
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "AVRO",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_target.avro"
- }
+ "connector": {
+ "type": "AVRO",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_target.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -76,13 +70,15 @@
{
"type": "record",
"name": "missRecords"
- }
- ]
+ }
+ ]
}
]
},
-
- "sinks": ["CONSOLE", "ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
```
Above is the configure file of batch accuracy job.
@@ -99,25 +95,20 @@
```
{
"name": "prof_batch",
-
"process.type": "BATCH",
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "HIVE",
- "version": "1.2",
- "config": {
- "database": "default",
- "table.name": "src"
- }
+ "connector": {
+ "type": "HIVE",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "src"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -129,8 +120,8 @@
{
"type": "metric",
"name": "prof"
- }
- ]
+ }
+ ]
},
{
"dsl.type": "griffin-dsl",
@@ -142,13 +133,15 @@
"type": "metric",
"name": "name_grp",
"flatten": "array"
- }
+ }
]
}
]
},
-
- "sinks": ["CONSOLE", "ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
```
Above is the configure file of batch profiling job.
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index 4d7594c..e286ab1 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -120,37 +120,31 @@
```
{
"name": "accu_batch",
-
"process.type": "BATCH",
-
"data.sources": [
{
"name": "src",
- "connectors": [
- {
- "type": "AVRO",
- "version": "1.7",
- "config": {
- "file.path": "<path>/<to>",
- "file.name": "<source-file>.avro"
- }
+ "connector": {
+ "type": "AVRO",
+ "version": "1.7",
+ "config": {
+ "file.path": "<path>/<to>",
+ "file.name": "<source-file>.avro"
}
- ]
- }, {
+ }
+ },
+ {
"name": "tgt",
- "connectors": [
- {
- "type": "AVRO",
- "version": "1.7",
- "config": {
- "file.path": "<path>/<to>",
- "file.name": "<target-file>.avro"
- }
+ "connector": {
+ "type": "AVRO",
+ "version": "1.7",
+ "config": {
+ "file.path": "<path>/<to>",
+ "file.name": "<target-file>.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -164,7 +158,7 @@
"miss": "miss_count",
"total": "total_count",
"matched": "matched_count"
- },
+ },
"out": [
{
"type": "metric",
@@ -172,13 +166,16 @@
},
{
"type": "record"
- }
+ }
]
}
]
},
-
- "sinks": ["CONSOLE", "HTTP", "HDFS"]
+ "sinks": [
+ "CONSOLE",
+ "HTTP",
+ "HDFS"
+ ]
}
```
Above lists DQ job configure parameters.
@@ -187,7 +184,7 @@
- **process.type**: Process type of DQ job, "BATCH" or "STREAMING".
- **data.sources**: List of data sources in this DQ job.
+ name: Name of this data source, it should be different from other data sources.
- + connectors: List of data connectors combined as the same data source. Details of data connector configuration [here](#data-connector).
+ + connector: Data connector for this data source. Details of data connector configuration [here](#data-connector).
- **evaluate.rule**: Evaluate rule parameters of this DQ job.
+ dsl.type: Default dsl type of all the rules.
+ rules: List of rules, to define every rule step. Details of rule configuration [here](#rule).
@@ -195,7 +192,7 @@
### Data Connector
-Data Connectors help connector to external sources on which DQ checks can be applied.
+Data Connector help connect to external sources on which DQ checks can be applied.
List of supported data connectors:
- Hive
@@ -208,8 +205,7 @@
A sample data connector configuration is as following,
```
-"connectors": [
- {
+"connector": {
"type": "file",
"version": "1.7",
"config": {
@@ -217,7 +213,6 @@
"key2": "value2"
}
}
-]
```
##### Key Parameters:
@@ -234,15 +229,13 @@
+ For **Streaming** it should implement StreamingDataConnector trait.
- Example:
```
- "connectors": [
- {
+ "connector": {
"type": "custom",
"config": {
"class": "org.apache.griffin.measure.datasource.connector.batch.CassandraDataConnector",
...
}
}
- ]
```
**Note:** User-provided data connector should be present in Spark job's class path, by either providing custom jar with
@@ -320,20 +313,32 @@
- Example:
```
- "connectors": [
- {
+ "connector": {
"type": "file",
"config": {
"format": "csv",
- "paths": ["/path/to/csv/dir/*", "/path/to/dir/test.csv"],
+ "paths": [
+ "/path/to/csv/dir/*",
+ "/path/to/dir/test.csv"
+ ],
"options": {
- "header": "true"
- },
+ "header": "true"
+ },
"skipOnError": "false",
- "schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
+ "schema": [
+ {
+ "name": "user_id",
+ "type": "string",
+ "nullable": "true"
+ },
+ {
+ "name": "age",
+ "type": "int",
+ "nullable": "false"
+ }
+ ]
}
}
- ]
**Note:** Additional examples of schema:
- "schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]
@@ -362,8 +367,7 @@
- Example:
```
- "connectors": [
- {
+ "connector": {
"type": "jdbc",
"config": {
"database": "default",
@@ -374,8 +378,7 @@
"driver": "com.mysql.jdbc.Driver",
"where": ""
}
- }
- ]
+ }
**Note:** Jar containing driver class should be present in Spark job's class path, by either providing custom jar with
`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.
diff --git a/griffin-doc/measure/measure-streaming-sample.md b/griffin-doc/measure/measure-streaming-sample.md
index 9e47143..b942778 100644
--- a/griffin-doc/measure/measure-streaming-sample.md
+++ b/griffin-doc/measure/measure-streaming-sample.md
@@ -24,93 +24,93 @@
```
{
"name": "accu_streaming",
-
"process.type": "STREAMING",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "KAFKA",
- "version": "0.8",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "src_group",
- "auto.offset.reset": "largest",
- "auto.commit.enable": "false"
- },
- "topics": "sss",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "KAFKA",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "src_group",
+ "auto.offset.reset": "largest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"cache": {
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-2m", "0"],
+ "time.range": [
+ "-2m",
+ "0"
+ ],
"updatable": true
}
- }, {
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "KAFKA",
- "version": "0.8",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "tgt_group",
- "auto.offset.reset": "largest",
- "auto.commit.enable": "false"
- },
- "topics": "ttt",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "KAFKA",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "tgt_group",
+ "auto.offset.reset": "largest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "t1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from t1"
- }
- ]
- }
- ],
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "t1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from t1"
+ }
+ ]
+ },
"cache": {
"file.path": "hdfs://localhost/griffin/streaming/dump/target",
"info.path": "target",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-2m", "0"]
+ "time.range": [
+ "-2m",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -132,14 +132,16 @@
},
{
"type": "record",
- "name": "missRecords",
- }
+ "name": "missRecords"
+ }
]
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
```
Above is the configure file of streaming accuracy job.
@@ -175,52 +177,50 @@
```
{
"name": "prof_streaming",
-
"process.type": "STREAMING",
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "KAFKA",
- "version": "0.8",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "group1",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "sss",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "KAFKA",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"cache": {
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": [
+ "0",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -232,8 +232,8 @@
{
"type": "metric",
"name": "prof"
- }
- ]
+ }
+ ]
},
{
"dsl.type": "griffin-dsl",
@@ -245,13 +245,15 @@
"type": "metric",
"name": "name_group",
"flatten": "array"
- }
- ]
+ }
+ ]
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
```
Above is the configure file of streaming profiling job.
diff --git a/griffin-doc/measure/predicates.md b/griffin-doc/measure/predicates.md
index 389edbe..87f1a0d 100644
--- a/griffin-doc/measure/predicates.md
+++ b/griffin-doc/measure/predicates.md
@@ -31,7 +31,7 @@
...
"data.sources": [
...
- "connectors": [
+ "connector": {
"predicates": [
{
"type": "file.exist",
@@ -42,8 +42,7 @@
}
],
...
-
- ]
+ }
}
```
diff --git a/griffin-doc/service/api-guide.md b/griffin-doc/service/api-guide.md
index 26435d6..482dbd3 100644
--- a/griffin-doc/service/api-guide.md
+++ b/griffin-doc/service/api-guide.md
@@ -163,137 +163,131 @@
curl -k -H "Content-Type: application/json" -H "Accept: application/json" \
-X POST http://127.0.0.1:8080/api/v1/measures \
-d '{
- "name":"profiling_measure",
- "measure.type":"griffin",
- "dq.type":"PROFILING",
- "rule.description":{
- "details":[
- {
- "name":"age",
- "infos":"Total Count,Average"
- }
+ "name": "profiling_measure",
+ "measure.type": "griffin",
+ "dq.type": "PROFILING",
+ "rule.description": {
+ "details": [
+ {
+ "name": "age",
+ "infos": "Total Count,Average"
+ }
]
- },
- "process.type":"BATCH",
- "owner":"test",
- "description":"measure description",
- "data.sources":[
+ },
+ "process.type": "BATCH",
+ "owner": "test",
+ "description": "measure description",
+ "data.sources": [
{
- "name":"source",
- "connectors":[
- {
- "name":"connector_name",
- "type":"HIVE",
- "version":"1.2",
- "data.unit":"1hour",
- "data.time.zone":"UTC(WET,GMT)",
- "config":{
- "database":"default",
- "table.name":"demo_src",
- "where":"dt=#YYYYMMdd# AND hour=#HH#"
- },
- "predicates":[
- {
- "type":"file.exist",
- "config":{
- "root.path":"hdfs:///griffin/demo_src",
- "path":"/dt=#YYYYMMdd#/hour=#HH#/_DONE"
- }
- }
- ]
+ "name": "source",
+ "connector": {
+ "name": "connector_name",
+ "type": "HIVE",
+ "version": "1.2",
+ "data.unit": "1hour",
+ "data.time.zone": "UTC(WET,GMT)",
+ "config": {
+ "database": "default",
+ "table.name": "demo_src",
+ "where": "dt=#YYYYMMdd# AND hour=#HH#"
+ },
+ "predicates": [
+ {
+ "type": "file.exist",
+ "config": {
+ "root.path": "hdfs:///griffin/demo_src",
+ "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
}
+ }
]
+ }
}
- ],
- "evaluate.rule":{
- "rules":[
- {
- "dsl.type":"griffin-dsl",
- "dq.type":"PROFILING",
- "rule":"count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
- "name":"profiling",
- "details":{}
- }
+ ],
+ "evaluate.rule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "PROFILING",
+ "rule": "count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
+ "name": "profiling",
+ "details": {}
+ }
]
- }
-}'
+ }
+ }'
```
Here is an example to define measure of accuracy:
```
curl -k -H "Content-Type: application/json" -H "Accept: application/json" \
-X POST http://127.0.0.1:8080/api/v1/measures \
-d '{
- "name":"accuracy_measure",
- "measure.type":"griffin",
- "dq.type":"ACCURACY",
- "process.type":"BATCH",
- "owner":"test",
- "description":"measure description",
- "data.sources":[
+ "name": "accuracy_measure",
+ "measure.type": "griffin",
+ "dq.type": "ACCURACY",
+ "process.type": "BATCH",
+ "owner": "test",
+ "description": "measure description",
+ "data.sources": [
{
- "name":"source",
- "connectors":[
- {
- "name":"connector_name_source",
- "type":"HIVE",
- "version":"1.2",
- "data.unit":"1hour",
- "data.time.zone":"UTC(WET,GMT)",
- "config":{
- "database":"default",
- "table.name":"demo_src",
- "where":"dt=#YYYYMMdd# AND hour=#HH#"
- },
- "predicates":[
- {
- "type":"file.exist",
- "config":{
- "root.path":"hdfs:///griffin/demo_src",
- "path":"/dt=#YYYYMMdd#/hour=#HH#/_DONE"
- }
- }
- ]
+ "name": "source",
+ "connector": {
+ "name": "connector_name_source",
+ "type": "HIVE",
+ "version": "1.2",
+ "data.unit": "1hour",
+ "data.time.zone": "UTC(WET,GMT)",
+ "config": {
+ "database": "default",
+ "table.name": "demo_src",
+ "where": "dt=#YYYYMMdd# AND hour=#HH#"
+ },
+ "predicates": [
+ {
+ "type": "file.exist",
+ "config": {
+ "root.path": "hdfs:///griffin/demo_src",
+ "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
}
+ }
]
+ }
},
{
- "name":"target",
- "connectors":[
- {
- "name":"connector_name_target",
- "type":"HIVE",
- "version":"1.2",
- "data.unit":"1hour",
- "data.time.zone":"UTC(WET,GMT)",
- "config":{
- "database":"default",
- "table.name":"demo_tgt",
- "where":"dt=#YYYYMMdd# AND hour=#HH#"
- },
- "predicates":[
- {
- "type":"file.exist",
- "config":{
- "root.path":"hdfs:///griffin/demo_src",
- "path":"/dt=#YYYYMMdd#/hour=#HH#/_DONE"
- }
- }
- ]
+ "name": "target",
+ "connector": {
+ "name": "connector_name_target",
+ "type": "HIVE",
+ "version": "1.2",
+ "data.unit": "1hour",
+ "data.time.zone": "UTC(WET,GMT)",
+ "config": {
+ "database": "default",
+ "table.name": "demo_tgt",
+ "where": "dt=#YYYYMMdd# AND hour=#HH#"
+ },
+ "predicates": [
+ {
+ "type": "file.exist",
+ "config": {
+ "root.path": "hdfs:///griffin/demo_src",
+ "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
}
+ }
]
+ }
}
- ],
- "evaluate.rule":{
- "rules":[
- {
- "dsl.type":"griffin-dsl",
- "dq.type":"ACCURACY",
- "name":"accuracy",
- "rule":"source.desc=target.desc"
- }
+ ],
+ "evaluate.rule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "ACCURACY",
+ "name": "accuracy",
+ "rule": "source.desc=target.desc"
+ }
]
- }
-}'
+ }
+ }'
```
Here is an example to define external measure:
```
@@ -319,84 +313,90 @@
#### API Example
```bash
curl -k -H "Accept: application/json" -X GET http://127.0.0.1:8080/api/v1/measures
-[{
- "measure.type": "griffin",
- "id": 1,
- "name": "accuracy_measure",
- "owner": "test",
- "description": "measure description",
- "deleted": false,
- "dq.type": "ACCURACY",
- "sinks": ["ELASTICSEARCH", "HDFS"],
- "process.type": "BATCH",
- "data.sources": [{
- "id": 4,
- "name": "source",
- "connectors": [{
- "id": 5,
- "name": "connector_name_source",
- "type": "HIVE",
- "version": "1.2",
- "predicates": [{
- "id": 6,
- "type": "file.exist",
- "config": {
- "root.path": "hdfs:///127.0.0.1/demo_src",
- "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
- }
- }
- ],
- "data.unit": "1hour",
- "data.time.zone": "UTC(WET,GMT)",
- "config": {
- "database": "default",
- "table.name": "demo_src",
- "where": "dt=#YYYYMMdd# AND hour=#HH#"
- }
- }
- ],
- "baseline": false
- }, {
- "id": 7,
- "name": "target",
- "connectors": [{
- "id": 8,
- "name": "connector_name_target",
- "type": "HIVE",
- "version": "1.2",
- "predicates": [{
- "id": 9,
- "type": "file.exist",
- "config": {
- "root.path": "hdfs:///127.0.0.1/demo_src",
- "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
- }
- }
- ],
- "data.unit": "1hour",
- "data.time.zone": "UTC(WET,GMT)",
- "config": {
- "database": "default",
- "table.name": "demo_tgt",
- "where": "dt=#YYYYMMdd# AND hour=#HH#"
- }
- }
- ],
- "baseline": false
+[
+ {
+ "measure.type": "griffin",
+ "id": 1,
+ "name": "accuracy_measure",
+ "owner": "test",
+ "description": "measure description",
+ "deleted": false,
+ "dq.type": "ACCURACY",
+ "sinks": [
+ "ELASTICSEARCH",
+ "HDFS"
+ ],
+ "process.type": "BATCH",
+ "data.sources": [
+ {
+ "id": 4,
+ "name": "source",
+ "connector": {
+ "id": 5,
+ "name": "connector_name_source",
+ "type": "HIVE",
+ "version": "1.2",
+ "predicates": [
+ {
+ "id": 6,
+ "type": "file.exist",
+ "config": {
+ "root.path": "hdfs:///127.0.0.1/demo_src",
+ "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
+ }
}
- ],
- "evaluate.rule": {
- "id": 2,
- "rules": [{
- "id": 3,
- "rule": "source.desc=target.desc",
- "dsl.type": "griffin-dsl",
- "dq.type": "ACCURACY"
- }
- ]
+ ],
+ "data.unit": "1hour",
+ "data.time.zone": "UTC(WET,GMT)",
+ "config": {
+ "database": "default",
+ "table.name": "demo_src",
+ "where": "dt=#YYYYMMdd# AND hour=#HH#"
+ }
},
- "measure.type": "griffin"
+ "baseline": false
+ },
+ {
+ "id": 7,
+ "name": "target",
+ "connector": {
+ "id": 8,
+ "name": "connector_name_target",
+ "type": "HIVE",
+ "version": "1.2",
+ "predicates": [
+ {
+ "id": 9,
+ "type": "file.exist",
+ "config": {
+ "root.path": "hdfs:///127.0.0.1/demo_src",
+ "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
+ }
+ }
+ ],
+ "data.unit": "1hour",
+ "data.time.zone": "UTC(WET,GMT)",
+ "config": {
+ "database": "default",
+ "table.name": "demo_tgt",
+ "where": "dt=#YYYYMMdd# AND hour=#HH#"
+ }
+ },
+ "baseline": false
+ }
+ ],
+ "evaluate.rule": {
+ "id": 2,
+ "rules": [
+ {
+ "id": 3,
+ "rule": "source.desc=target.desc",
+ "dsl.type": "griffin-dsl",
+ "dq.type": "ACCURACY"
+ }
+ ]
}
+ }
]
```
@@ -422,64 +422,69 @@
curl -k -H "Content-Type: application/json" -H "Accept: application/json" \
-X PUT http://127.0.0.1:8080/api/v1/measures \
-d '{
- "measure.type": "griffin",
- "id": 19,
- "name": "profiling_measure_edited",
- "owner": "test",
- "description": "measure description",
- "deleted": false,
- "dq.type": "PROFILING",
- "sinks": ["ELASTICSEARCH", "HDFS"],
- "process.type": "BATCH",
- "rule.description": {
- "details": [{
- "name": "age",
- "infos": "Total Count,Average"
- }
+ "measure.type": "griffin",
+ "id": 19,
+ "name": "profiling_measure_edited",
+ "owner": "test",
+ "description": "measure description",
+ "deleted": false,
+ "dq.type": "PROFILING",
+ "sinks": [
+ "ELASTICSEARCH",
+ "HDFS"
+ ],
+ "process.type": "BATCH",
+ "rule.description": {
+ "details": [
+ {
+ "name": "age",
+ "infos": "Total Count,Average"
+ }
]
- },
- "data.sources": [{
- "id": 22,
- "name": "source",
- "connectors": [{
- "id": 23,
- "name": "connector_name",
- "type": "HIVE",
- "version": "1.2",
- "predicates": [{
- "id": 24,
- "type": "file.exist",
- "config": {
- "root.path": "hdfs:///griffin/demo_src",
- "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
- }
- }
- ],
- "data.unit": "1hour",
- "data.time.zone": "UTC(WET,GMT)",
- "config": {
- "database": "default",
- "table.name": "demo_src",
- "where": "dt=#YYYYMMdd# AND hour=#HH#"
- }
+ },
+ "data.sources": [
+ {
+ "id": 22,
+ "name": "source",
+ "connector": {
+ "id": 23,
+ "name": "connector_name",
+ "type": "HIVE",
+ "version": "1.2",
+ "predicates": [
+ {
+ "id": 24,
+ "type": "file.exist",
+ "config": {
+ "root.path": "hdfs:///griffin/demo_src",
+ "path": "/dt=#YYYYMMdd#/hour=#HH#/_DONE"
}
+ }
],
- "baseline": false
- }
- ],
- "evaluate.rule": {
- "id": 20,
- "rules": [{
- "id": 21,
- "rule": "count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
- "dsl.type": "griffin-dsl",
- "dq.type": "PROFILING",
- "details": {}
+ "data.unit": "1hour",
+ "data.time.zone": "UTC(WET,GMT)",
+ "config": {
+ "database": "default",
+ "table.name": "demo_src",
+ "where": "dt=#YYYYMMdd# AND hour=#HH#"
}
+ },
+ "baseline": false
+ }
+ ],
+ "evaluate.rule": {
+ "id": 20,
+ "rules": [
+ {
+ "id": 21,
+ "rule": "count(source.`age`) AS `age-count`,avg(source.`age`) AS `age-average`",
+ "dsl.type": "griffin-dsl",
+ "dq.type": "PROFILING",
+ "details": {}
+ }
]
- },
- "measure.type": "griffin"
-}'
+ }
+ }'
```
Here is an example to update external measure:
```
diff --git a/measure/src/main/resources/config-batch-advanced.json b/measure/src/main/resources/config-batch-advanced.json
index 9e98efa..c6740e9 100644
--- a/measure/src/main/resources/config-batch-advanced.json
+++ b/measure/src/main/resources/config-batch-advanced.json
@@ -1,35 +1,29 @@
{
"name": "accu_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
- }, {
+ }
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_target.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_target.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -44,7 +38,7 @@
"total": "total_count",
"matched": "matched_count"
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "accu"
@@ -57,5 +51,8 @@
}
]
},
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
diff --git a/measure/src/main/resources/config-batch-path.json b/measure/src/main/resources/config-batch-path.json
index 6aab127..883a76b 100644
--- a/measure/src/main/resources/config-batch-path.json
+++ b/measure/src/main/resources/config-batch-path.json
@@ -1,35 +1,29 @@
{
"name": "accu_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.path": "measure/src/test/resources/users_info_src"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.path": "measure/src/test/resources/users_info_src"
}
- ]
- }, {
+ }
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.path": "measure/src/test/resources/users_info_target"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.path": "measure/src/test/resources/users_info_target"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -40,5 +34,8 @@
}
]
},
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json
index 69ad485..d7bc337 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -1,35 +1,29 @@
{
"name": "accu_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "measure/src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "measure/src/test/resources/users_info_src.avro"
}
- ]
- }, {
+ }
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "measure/src/test/resources/users_info_target.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "measure/src/test/resources/users_info_target.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -40,5 +34,8 @@
}
]
},
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json
index 1be7a0a..9828984 100644
--- a/measure/src/main/resources/config-streaming.json
+++ b/measure/src/main/resources/config-streaming.json
@@ -1,54 +1,52 @@
{
"name": "prof_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name" : "kafka",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.147.177.107:9092",
- "group.id": "group1",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "sss",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "kafka",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "kafka",
- "out.dataframe.name": "out1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "in.dataframe.name":"out1",
- "out.datafrmae.name": "out3",
- "rule": "select name, age from out1"
- }
- ]
- }
- ],
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "kafka",
+ "out.dataframe.name": "out1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "in.dataframe.name": "out1",
+ "out.datafrmae.name": "out3",
+ "rule": "select name, age from out1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": [
+ "0",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -56,7 +54,7 @@
"dq.type": "profiling",
"out.dataframe.name": "prof",
"rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source",
- "out":[
+ "out": [
{
"type": "metric",
"name": "prof"
@@ -68,7 +66,7 @@
"dq.type": "profiling",
"out.dataframe.name": "grp",
"rule": "select name, count(*) as `cnt` from source group by name",
- "out":[
+ "out": [
{
"type": "metric",
"name": "name_group",
@@ -78,5 +76,7 @@
}
]
},
- "sinks": ["ELASTICSEARCH"]
+ "sinks": [
+ "ELASTICSEARCH"
+ ]
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 993f432..9264185 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -21,13 +21,7 @@
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{
- DqType,
- DslType,
- FlattenType,
- OutputType,
- SinkType
-}
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.enums.DqType._
import org.apache.griffin.measure.configuration.enums.DslType.{DslType, GriffinDsl}
import org.apache.griffin.measure.configuration.enums.FlattenType.{
@@ -86,31 +80,31 @@
* data source param
* @param name data source name (must)
* @param baseline data source is baseline or not, false by default (optional)
- * @param connectors data connectors (optional)
+ * @param connector data connector (optional)
* @param checkpoint data source checkpoint configuration (must in streaming mode with streaming connectors)
*/
@JsonInclude(Include.NON_NULL)
case class DataSourceParam(
@JsonProperty("name") private val name: String,
- @JsonProperty("connectors") private val connectors: List[DataConnectorParam],
+ @JsonProperty("connector") private val connector: DataConnectorParam,
@JsonProperty("baseline") private val baseline: Boolean = false,
@JsonProperty("checkpoint") private val checkpoint: Map[String, Any] = null)
extends Param {
def getName: String = name
def isBaseline: Boolean = if (Option(baseline).isDefined) baseline else false
- def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
+ def getConnector: Option[DataConnectorParam] = Option(connector)
def getCheckpointOpt: Option[Map[String, Any]] = Option(checkpoint)
def validate(): Unit = {
assert(StringUtils.isNotBlank(name), "data source name should not be empty")
- getConnectors.foreach(_.validate())
+ assert(getConnector.isDefined, "Connector is undefined or invalid")
+ getConnector.foreach(_.validate())
}
}
/**
* data connector param
* @param conType data connector type, e.g.: hive, avro, kafka (must)
- * @param version data connector type version (optional)
* @param dataFrameName data connector dataframe name, for pre-process input usage (optional)
* @param config detail configuration of data connector (must)
* @param preProc pre-process rules after load data (optional)
@@ -118,13 +112,11 @@
@JsonInclude(Include.NON_NULL)
case class DataConnectorParam(
@JsonProperty("type") private val conType: String,
- @JsonProperty("version") private val version: String,
@JsonProperty("dataframe.name") private val dataFrameName: String,
@JsonProperty("config") private val config: Map[String, Any],
@JsonProperty("pre.proc") private val preProc: List[RuleParam])
extends Param {
def getType: String = conType
- def getVersion: String = if (version != null) version else ""
def getDataFrameName(defName: String): String =
if (dataFrameName != null) dataFrameName else defName
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 872deb1..9eff4d0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -30,13 +30,13 @@
* data source
* @param name name of data source
* @param dsParam param of this data source
- * @param dataConnectors list of data connectors
+ * @param dataConnector data connector
* @param streamingCacheClientOpt streaming data cache client option
*/
case class DataSource(
name: String,
dsParam: DataSourceParam,
- dataConnectors: Seq[DataConnector],
+ dataConnector: Option[DataConnector],
streamingCacheClientOpt: Option[StreamingCacheClient])
extends Loggable
with Serializable {
@@ -44,7 +44,7 @@
val isBaseline: Boolean = dsParam.isBaseline
def init(): Unit = {
- dataConnectors.foreach(_.init())
+ dataConnector.foreach(_.init())
}
def loadData(context: DQContext): TimeRange = {
@@ -67,7 +67,7 @@
}
private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
- val batches = dataConnectors.flatMap { dc =>
+ val batches = dataConnector.flatMap { dc =>
val (dfOpt, timeRange) = dc.data(timestamp)
dfOpt match {
case Some(_) => Some((dfOpt, timeRange))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 67d9544..7049063 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -25,7 +25,7 @@
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
-import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
+import org.apache.griffin.measure.datasource.connector.DataConnectorFactory
object DataSourceFactory extends Loggable {
@@ -45,7 +45,6 @@
dataSourceParam: DataSourceParam,
index: Int): Option[DataSource] = {
val name = dataSourceParam.getName
- val connectorParams = dataSourceParam.getConnectors
val timestampStorage = TimestampStorage()
// for streaming data cache
@@ -56,19 +55,23 @@
index,
timestampStorage)
- val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
- DataConnectorFactory.getDataConnector(
- sparkSession,
- ssc,
- connectorParam,
- timestampStorage,
- streamingCacheClientOpt) match {
- case Success(connector) => Some(connector)
- case _ => None
- }
- }
+ val connectorParamsOpt = dataSourceParam.getConnector
- Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
+ connectorParamsOpt match {
+ case Some(connectorParam) =>
+ val dataConnectors = DataConnectorFactory.getDataConnector(
+ sparkSession,
+ ssc,
+ connectorParam,
+ timestampStorage,
+ streamingCacheClientOpt) match {
+ case Success(connector) => Some(connector)
+ case _ => None
+ }
+
+ Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
+ case None => None
+ }
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
index baa5639..a6bc7cb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
@@ -31,11 +31,14 @@
def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
val name = getStepName(param.getName)
- val steps = param.getConnectors.flatMap { dc =>
- buildReadSteps(context, dc)
+
+ param.getConnector match {
+ case Some(dc) =>
+ val steps = buildReadSteps(context, dc)
+ if (steps.isDefined) Some(UnionReadStep(name, Seq(steps.get)))
+ else None
+ case _ => None
}
- if (steps.nonEmpty) Some(UnionReadStep(name, steps))
- else None
}
protected def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): Option[ReadStep]
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json
index 149c839..19a49c3 100644
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -1,35 +1,27 @@
{
"name": "accu_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
- }, {
+ }
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_target.avro"
- }
+ "connector": {
+ "type": "avro",
+ "config": {
+ "file.name": "src/test/resources/users_info_target.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -44,7 +36,7 @@
"total": "total_count",
"matched": "matched_count"
},
- "out":[
+ "out": [
{
"type": "record",
"name": "missRecords"
@@ -53,6 +45,8 @@
}
]
},
-
- "sinks": ["LOG", "ELASTICSEARCH"]
+ "sinks": [
+ "LOG",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index 9ce25df..c7c1095 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -1,96 +1,96 @@
{
"name": "accu_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.147.177.107:9092",
- "group.id": "group1",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "sss",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"type": "parquet",
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-2m", "0"],
+ "time.range": [
+ "-2m",
+ "0"
+ ],
"updatable": true
}
- }, {
+ },
+ {
"name": "target",
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.147.177.107:9092",
- "group.id": "group1",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "ttt",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "t1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from t1"
- }
- ]
- }
- ],
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "t1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from t1"
+ }
+ ]
+ },
"checkpoint": {
"type": "parquet",
"file.path": "hdfs://localhost/griffin/streaming/dump/target",
"info.path": "target",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-2m", "0"]
+ "time.range": [
+ "-2m",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -105,19 +105,21 @@
"total": "total_count",
"matched": "matched_count"
},
- "out":[
+ "out": [
{
- "type":"metric",
+ "type": "metric",
"name": "accu"
},
{
- "type":"record",
+ "type": "record",
"name": "missRecords"
}
]
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index 4f42092..c757624 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -1,25 +1,19 @@
{
"name": "comp_batch",
-
"process.type": "batch",
-
"timestamp": 123456,
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -27,7 +21,7 @@
"dq.type": "completeness",
"out.dataframe.name": "comp",
"rule": "email, post_code, first_name",
- "out":[
+ "out": [
{
"type": "metric",
"name": "comp"
@@ -36,6 +30,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index 53e1e7b..114c12d 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -1,52 +1,50 @@
{
"name": "comp_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.147.177.107:9092",
- "group.id": "source",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "test",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "source",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "test",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": [
+ "0",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -54,7 +52,7 @@
"dq.type": "completeness",
"out.dataframe.name": "comp",
"rule": "name, age",
- "out":[
+ "out": [
{
"type": "metric",
"name": "comp"
@@ -63,6 +61,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
index e3b1f1c..cfe5326 100644
--- a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
@@ -1,17 +1,15 @@
{
"data.sources": [
{
- "connectors": [
- {
- "dataframe.name": "prof_table",
- "config": {
- "table.name": "efg",
- "database": "abc",
- "where": "`date`=\"20190825\""
- },
- "type": "hive"
- }
- ],
+ "connector": {
+ "dataframe.name": "prof_table",
+ "config": {
+ "table.name": "efg",
+ "database": "abc",
+ "where": "`date`=\"20190825\""
+ },
+ "type": "hive"
+ },
"name": "source"
}
],
@@ -26,16 +24,23 @@
"out.dataframe.name": "prof",
"dsl.type": "griffin-dsl",
"dq.type": "completeness",
- "error.confs":[
+ "error.confs": [
{
"column.name": "user",
"type": "enumeration",
- "values":["1", "2", "hive_none", ""]
+ "values": [
+ "1",
+ "2",
+ "hive_none",
+ ""
+ ]
},
{
"column.name": "name",
"type": "regex",
- "values":["^zhanglei.natur\\w{1}$"]
+ "values": [
+ "^zhanglei.natur\\w{1}$"
+ ]
}
],
"out": [
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json
index d946089..bef2b50 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -1,37 +1,30 @@
{
"name": "dist_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
+ }
},
{
"name": "target",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -48,7 +41,7 @@
"num": "num",
"duplication.array": "dup"
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "distinct"
@@ -57,6 +50,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
index e3629d1..88540e4 100644
--- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -1,8 +1,6 @@
{
"name": "dist_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "new",
@@ -11,53 +9,56 @@
"info.path": "new",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"],
+ "time.range": [
+ "0",
+ "0"
+ ],
"read.only": true
}
},
{
"name": "old",
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "old",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "ttt",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "old",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/old",
"info.path": "old",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-24h", "0"]
+ "time.range": [
+ "-24h",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -75,7 +76,7 @@
"num": "num",
"duplication.array": "dup"
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "distinct"
@@ -84,6 +85,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
index 70cc369..d7df301 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
@@ -1,26 +1,20 @@
{
"name": "prof_batch",
-
"process.type": "batch",
-
"timestamp": 123456,
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "hive",
- "version": "1.2",
- "config": {
- "database": "default",
- "table.name": "s1"
- }
+ "connector": {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "s1"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -28,7 +22,7 @@
"dq.type": "profiling",
"out.dataframe.name": "prof",
"rule": "name, count(*) as cnt from source group by name",
- "out":[
+ "out": [
{
"type": "metric",
"name": "name_group",
@@ -41,7 +35,7 @@
"dq.type": "profiling",
"out.dataframe.name": "grp",
"rule": "age, count(*) as cnt from source group by age order by cnt",
- "out":[
+ "out": [
{
"type": "metric",
"name": "age_group",
@@ -51,6 +45,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json
index 273d2e4..e1df3da 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -1,32 +1,26 @@
{
"name": "prof_batch",
-
"process.type": "batch",
-
"timestamp": 123456,
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "dataframe.name" : "this_table",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- },
- "pre.proc": [
- {
- "dsl.type": "spark-sql",
- "rule": "select * from this_table where user_id < 10014"
- }
- ]
- }
- ]
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "dataframe.name": "this_table",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "spark-sql",
+ "rule": "select * from this_table where user_id < 10014"
+ }
+ ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -34,7 +28,7 @@
"dq.type": "profiling",
"out.dataframe.name": "prof",
"rule": "user_id, count(*) as cnt from source group by user_id",
- "out":[
+ "out": [
{
"type": "metric",
"name": "prof",
@@ -47,7 +41,7 @@
"dq.type": "profiling",
"out.dataframe.name": "grp",
"rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc",
- "out":[
+ "out": [
{
"type": "metric",
"name": "post_group",
@@ -57,6 +51,7 @@
}
]
},
-
- "sinks": ["CONSOLE"]
+ "sinks": [
+ "CONSOLE"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json b/measure/src/test/resources/_profiling-batch-sparksql.json
index c8077d0..eaf0f89 100644
--- a/measure/src/test/resources/_profiling-batch-sparksql.json
+++ b/measure/src/test/resources/_profiling-batch-sparksql.json
@@ -1,25 +1,19 @@
{
"name": "prof_batch",
-
"process.type": "batch",
-
"timestamp": 123456,
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -47,6 +41,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index 1d28745..efe0929 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -1,52 +1,50 @@
{
"name": "prof_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.147.177.107:9092",
- "group.id": "group1",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "test",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "test",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": [
+ "0",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -54,7 +52,7 @@
"dq.type": "profiling",
"out.dataframe.name": "prof",
"rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source",
- "out":[
+ "out": [
{
"type": "metric",
"name": "prof"
@@ -66,7 +64,7 @@
"dq.type": "profiling",
"out.dataframe.name": "grp",
"rule": "select name, count(*) as `cnt` from source group by name",
- "out":[
+ "out": [
{
"type": "metric",
"name": "name_group",
@@ -76,6 +74,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index f3759ca..f9be0fa 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -1,23 +1,18 @@
{
"name": "timeliness_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/timeliness_data.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/timeliness_data.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -35,9 +30,11 @@
"count": "cnt",
"step.size": "2m",
"percentile": "percentile",
- "percentile.values": [0.95]
+ "percentile.values": [
+ 0.95
+ ]
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "timeliness"
@@ -50,6 +47,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index 1663122..360ea88 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -1,52 +1,50 @@
{
"name": "timeliness_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "source",
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "group1",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "fff",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "fff",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": [
+ "0",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -64,9 +62,13 @@
"count": "cnt",
"step.size": "5m",
"percentile": "percentile",
- "percentile.values": [0.2, 0.5, 0.8]
+ "percentile.values": [
+ 0.2,
+ 0.5,
+ 0.8
+ ]
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "timeliness"
@@ -79,6 +81,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
index 2c32930..fe5eef5 100644
--- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
@@ -1,36 +1,29 @@
{
"name": "unique_batch",
-
"process.type": "batch",
-
"data.sources": [
{
"name": "source",
"baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
+ }
},
{
"name": "target",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
+ "connector": {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
}
- ]
+ }
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -46,7 +39,7 @@
"dup": "dup",
"num": "num"
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "unique"
@@ -59,6 +52,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
index a4f4dcc..7c57748 100644
--- a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
@@ -1,94 +1,93 @@
{
"name": "unique_streaming",
-
"process.type": "streaming",
-
"data.sources": [
{
"name": "new",
"baseline": true,
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "new",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "ttt",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "new",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/new",
"info.path": "new",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": [
+ "0",
+ "0"
+ ]
}
},
{
"name": "old",
- "connectors": [
- {
- "type": "kafka",
- "version": "0.8",
- "dataframe.name": "this",
- "config": {
- "kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
- "group.id": "old",
- "auto.offset.reset": "smallest",
- "auto.commit.enable": "false"
- },
- "topics": "ttt",
- "key.type": "java.lang.String",
- "value.type": "java.lang.String"
+ "connector": {
+ "type": "kafka",
+ "version": "0.8",
+ "dataframe.name": "this",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "old",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
},
- "pre.proc": [
- {
- "dsl.type": "df-ops",
- "in.dataframe.name": "this",
- "out.dataframe.name": "s1",
- "rule": "from_json"
- },
- {
- "dsl.type": "spark-sql",
- "out.dataframe.name": "this",
- "rule": "select name, age from s1"
- }
- ]
- }
- ],
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-ops",
+ "in.dataframe.name": "this",
+ "out.dataframe.name": "s1",
+ "rule": "from_json"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "out.dataframe.name": "this",
+ "rule": "select name, age from s1"
+ }
+ ]
+ },
"checkpoint": {
"file.path": "hdfs://localhost/griffin/streaming/dump/old",
"info.path": "old",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-24h", "0"]
+ "time.range": [
+ "-24h",
+ "0"
+ ]
}
}
],
-
"evaluate.rule": {
"rules": [
{
@@ -105,7 +104,7 @@
"num": "num",
"duplication.array": "dup"
},
- "out":[
+ "out": [
{
"type": "metric",
"name": "unique"
@@ -118,6 +117,8 @@
}
]
},
-
- "sinks": ["CONSOLE","ELASTICSEARCH"]
+ "sinks": [
+ "CONSOLE",
+ "ELASTICSEARCH"
+ ]
}
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 2537931..466196b 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -81,7 +81,6 @@
val param = DataConnectorParam(
"CUSTOM",
null,
- null,
Map("class" -> classOf[ExampleBatchDataConnector].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
@@ -96,7 +95,6 @@
val param = DataConnectorParam(
"CUSTOM",
null,
- null,
Map("class" -> classOf[MySqlDataConnector].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
@@ -109,7 +107,6 @@
val param = DataConnectorParam(
"CUSTOM",
null,
- null,
Map("class" -> classOf[KafkaStreamingStringDataConnector].getCanonicalName),
Nil)
val res = DataConnectorFactory.getDataConnector(null, null, param, null, None)
@@ -121,7 +118,6 @@
val param = DataConnectorParam(
"CUSTOM",
null,
- null,
Map("class" -> classOf[NotDataConnector].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
@@ -138,7 +134,6 @@
val param = DataConnectorParam(
"CUSTOM",
null,
- null,
Map("class" -> classOf[DataConnectorWithoutApply].getCanonicalName),
Nil)
// apparently Scalamock can not mock classes without empty-paren constructor, providing nulls
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
index 4cf2028..3df026a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -55,7 +55,7 @@
}
private final val dcParam =
- DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil)
+ DataConnectorParam("file", "test_df", Map.empty[String, String], Nil)
private final val timestampStorage = TimestampStorage()
// Regarding Local FileSystem
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index e707ef7..eec2e5e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -28,7 +28,7 @@
class JDBCBasedDataConnectorTest extends SparkSuiteBase with Matchers {
val url = "jdbc:h2:mem:test"
- var conn: java.sql.Connection = null
+ var conn: java.sql.Connection = _
val properties = new Properties()
properties.setProperty("user", "user")
properties.setProperty("password", "password")
@@ -51,7 +51,7 @@
}
private final val dcParam =
- DataConnectorParam("jdbc", "1", "test_df", Map.empty[String, String], Nil)
+ DataConnectorParam("jdbc", "test_df", Map.empty[String, String], Nil)
private final val timestampStorage = TimestampStorage()
"JDBC based data connector" should "be able to read data from relational database" in {
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index ddfc95b..2f06ad3 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -82,12 +82,8 @@
expectedResult: AccuracyResult) = {
val dqContext: DQContext = getDqContext(
dataSourcesParam = List(
- DataSourceParam(
- name = "source",
- connectors = List(dataConnectorParam(tableName = sourceName))),
- DataSourceParam(
- name = "target",
- connectors = List(dataConnectorParam(tableName = targetName)))))
+ DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
+ DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
val accuracyRule = RuleParam(
dslType = "griffin-dsl",
@@ -161,7 +157,6 @@
private def dataConnectorParam(tableName: String) = {
DataConnectorParam(
conType = "HIVE",
- version = null,
dataFrameName = null,
config = Map("table.name" -> tableName),
preProc = null)