Apache Griffin measure module needs two configuration files to define the parameters of execution, one is for environment, the other is for dq job.
{ "spark": { "log.level": "WARN", "checkpoint.dir": "hdfs:///griffin/streaming/cp", "batch.interval": "1m", "process.interval": "5m", "config": { "spark.default.parallelism": 5, "spark.task.maxFailures": 5, "spark.streaming.kafkaMaxRatePerPartition": 1000, "spark.streaming.concurrentJobs": 4, "spark.yarn.maxAppAttempts": 5, "spark.yarn.am.attemptFailuresValidityInterval": "1h", "spark.yarn.max.executor.failures": 120, "spark.yarn.executor.failuresValidityInterval": "1h", "spark.hadoop.fs.hdfs.impl.disable.cache": true } }, "sinks": [ { "name": "ConsoleSink", "type": "console", "config": { "max.log.lines": 100 } }, { "name": "HdfsSink", "type": "hdfs", "config": { "path": "hdfs:///griffin/streaming/persist", "max.lines.per.file": 10000 } } ], "griffin.checkpoint": [ { "type": "zk", "config": { "hosts": "<zookeeper host ip>:2181", "namespace": "griffin/infocache", "lock.path": "lock", "mode": "persist", "init.clear": true, "close.clear": false } } ] }
Above lists environment parameters.
Sinks allow persistence of job metrics and bad data (source records that violated the defined rules) to external storage systems. Sinks have to be defined in the Env Config, and their name
are mentioned in the Job Config.
List of supported sinks:
A sample sink configuration is as following,
... "sinks": [ { "name": "ConsoleSink", "type": "CONSOLE", "config": { "numRows": 10, "truncate": false } } ] ...
Name | Type | Description | Supported Values |
---|---|---|---|
name | String | User defined unique name for Sink | |
type | String | Type of Sink (Value is case insensitive) | console, hdfs, elasticsearch, mongodb, custom |
config | Object | Configuration params of the sink | Depends on sink type (see below) |
org.apache.griffin.measure.sink.Sink
trait... "sinks": [ { "name": "MyCustomSink", "type": "CUSTOM", "config": { "class": "my.package.sink.MyCustomSinkImpl", ... } } ] ...
Note: User-provided sink should be present in Spark job's class path, by either providing custom jar with --jars
parameter to spark-submit or by adding setting spark.jars
in spark -> config
section of environment config.
Console Sink, supports the following configurations. Other alias' like ‘Log’ as value for type
.
Name | Type | Description | Default Values |
---|---|---|---|
numRows | Integer | Number of records to log | 20 |
truncate | Boolean | If true, strings more than 20 characters will be truncated and all cells will be aligned right | true |
Example:
... "sinks": [ { "name": "ConsoleSink", "type": "CONSOLE", "config": { "numRows": 10, "truncate": false } } ] ...
HDFS Sink, supports the following configurations
Name | Type | Description | Default Values |
---|---|---|---|
path | String | HDFS base path to sink metrics | |
max.persist.lines | Integer | the max lines of total sink data | -1 |
max.lines.per.file | Integer | the max lines of each sink file | 1000000 |
Example:
... "sinks": [ { "name": "hdfsSink", "type": "HDFS", "config": { "path": "hdfs://localhost/griffin/batch/persist", "max.persist.lines": 10000, "max.lines.per.file": 10000 } } ] ...
MongoDB Sink, supports the following configurations. Other alias' like ‘Mongo’ as value for type
.
Name | Type | Description | Default Values |
---|---|---|---|
url | String | URL of MongoDB | |
database | String | Database name | |
collection | String | Collection name | |
over.time | Long | Wait Duration | -1 |
retry | Int | Number of retries | 10 |
Example:
... "sinks": [ { "name": "MongoDBSink", "type": "MongoDB", "config": { ... } } ] ...
Elasticsearch Sink, supports the following configurations. Other alias' like ‘ES’ and ‘HTTP’ as value for type
.
Name | Type | Description | Default Values |
---|---|---|---|
api | String | api to submit sink metrics | |
method | String | http method, “post” default | |
connection.timeout | Long | Wait Duration | -1 |
retry | Integer | Number of retries | 10 |
Example:
... "sinks": [ { "name": "ElasticsearchSink", "type": "Elasticsearch", "config": { ... } } ] ...
{ "name": "accu_batch", "process.type": "BATCH", "data.sources": [ { "name": "src", "connector": { "type": "AVRO", "config": { "file.path": "<path>/<to>", "file.name": "<source-file>.avro" } } }, { "name": "tgt", "connector": { "type": "AVRO", "config": { "file.path": "<path>/<to>", "file.name": "<target-file>.avro" } } } ], "evaluate.rule": { "rules": [ { "dsl.type": "griffin-dsl", "dq.type": "ACCURACY", "out.dataframe.name": "accu", "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", "details": { "source": "source", "target": "target", "miss": "miss_count", "total": "total_count", "matched": "matched_count" }, "out": [ { "type": "metric", "name": "accu" }, { "type": "record" } ] } ] }, "sinks": [ "CONSOLESink", "HTTPSink", "HDFSSink" ] }
Above lists DQ job configure parameters.
Data Connector help connect to external sources on which DQ checks can be applied.
List of supported data connectors:
A sample data connector configuration is as following,
... "connector": { "type": "file", "config": { "key1": "value1", "key2": "value2" } } ...
Name | Type | Description | Supported Values |
---|---|---|---|
type | String | Type of the Connector | file, hive, kafka (streaming only), jdbc, custom |
config | Object | Configuration params of the connector | Depends on connector type (see below) |
"connector": { "type": "custom", "config": { "class": "org.apache.griffin.measure.datasource.connector.batch.CassandraDataConnector", ... } }
Note: User-provided data connector should be present in Spark job's class path, by either providing custom jar with --jars
parameter to spark-submit or by adding setting spark.jars
in spark -> config
section of environment config.
Name | Type | Description | Default Values |
---|---|---|---|
index | String | ElasticSearch index name | default |
type | String | ElasticSearch data type | accuracy |
host | String | ElasticSearch url host | Empty |
port | String | ElasticSearch url port | Empty |
fields | List | list of columns | Empty |
size | Integer | data size (lines) to load | 100 |
metric.name | String | metric name to load | * |
"connectors": [ { "type": "custom", "config": { "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector", "index": "test-index-v1", "type": "metric", "host": "test.es-xxx.org", "port": "80", "fields": ["col_a", "col_b", "col_c"], "size": 20 } } ]
Name | Type | Description | Default Values |
---|---|---|---|
host | String | ElasticSearch url host | Empty |
port | String | ElasticSearch url port | Empty |
sql.mode | Boolean | use sql mode | false |
sql | String | ElasticSearch SQL | Empty |
"connectors": [ { "type": "custom", "config": { "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector", "host": "test.es-xxx.org", "port": "80", "sql.mode": true, "sql": "select col_a, col_b, col_c from test-index-v1 limit 20" } } ]
file://
namespace.Name | Type | Description | Supported Values | Default Values |
---|---|---|---|---|
format | String | type of file source | parquet, avro, orc, csv, tsv, text | parquet |
paths | List | path(s) to be read | Empty | |
options | Object | format specific options | Empty | |
skipOnError | Boolean | whether to continue execution if one or more paths are invalid | true, false | false |
schema | List | given as list of key value pairs | See example below | null |
"connector": { "type": "file", "config": { "format": "csv", "paths": [ "/path/to/csv/dir/*", "/path/to/dir/test.csv" ], "options": { "header": "true" }, "skipOnError": "false", "schema": [ { "name": "user_id", "type": "string", "nullable": "true" }, { "name": "age", "type": "int", "nullable": "false" } ] } }
Note: Additional examples of schema:
Name | Type | Description | Default Values |
---|---|---|---|
paths | List | Elasticsearch indices (Required) | |
filterExprs | List | List of string expressions that act as where conditions (row filters) | Empty |
selectionExprs | List | List of string expressions that act as selection conditions (column filters) | Empty |
options | Object | Additional elasticsearch options. Refer to ConfigurationOptions for options | Empty |
"connector": { "type": "elasticsearch", "config": { "selectionExprs": [ "account_number", "city", "gender", "age > 18" ], "filterExprs": [ "account_number < 10" ], "paths": [ "bank", "customer" ], "options": { "es.nodes": "localhost", "es.port": 9200 } } }
dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412 AND hour=15
Name | Type | Description | Default Values |
---|---|---|---|
database | String | database name | default |
tablename | String | table name to be read | Empty |
url | String | the connection string URL to database | Empty |
user | String | user for connection to database | Empty |
password | String | password for connection to database | null |
driver | String | driver class for JDBC connection to database | com.mysql.jdbc.Driver |
where | String | condition for reading data from table | Empty |
"connector": { "type": "jdbc", "config": { "database": "default", "tablename": "test", "url": "jdbc:mysql://localhost:3306/default", "user": "test_u", "password": "test_p", "driver": "com.mysql.jdbc.Driver", "where": "" } }
Note: Jar containing driver class should be present in Spark job's class path, by either providing custom jar with --jars
parameter to spark-submit or by adding setting spark.jars
in spark -> config
section of environment config.
false
if not specified.{"agg_col": "value"}
{"my_out_name": [{"agg_col": "value"}]}
{"my_out_name": {"agg_col": "value"}}