Apache Griffin Measure Configuration Guide

Apache Griffin measure module needs two configuration files to define the parameters of execution, one is for environment, the other is for dq job.

Environment Parameters

{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
    "batch.interval": "1m",
    "process.interval": "5m",
    "config": {
      "spark.default.parallelism": 5,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },

  "sinks": [
    {
      "type": "console",
      "config": {
        "max.log.lines": 100
      }
    }, {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/streaming/persist",
        "max.lines.per.file": 10000
      }
    }
  ],

  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "<zookeeper host ip>:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}

Above lists environment parameters.

  • spark: This field configures spark and spark streaming parameters.
    • log.level: Level of spark log.
    • checkpoint.dir: Check point directory of spark streaming, for streaming mode.
    • batch.interval: Interval of dumping streaming data, for streaming mode.
    • process.interval: Interval of processing dumped streaming data, for streaming mode.
    • config: Configuration of spark parameters.
  • sinks: This field configures list of metrics sink parameters, multiple sink ways are supported. Details of sink configuration here.
  • griffin.checkpoint: This field configures list of griffin checkpoint parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration here.

Sinks

  • type: Metrics and records sink type, “console”, “hdfs”, “http”, “mongo”, “custom”.
  • config: Configure parameters of each sink type.
    • console sink (aliases: “log”)
      • max.log.lines: the max lines of log.
    • hdfs sink
      • path: hdfs path to sink metrics
      • max.persist.lines: the max lines of total sink data.
      • max.lines.per.file: the max lines of each sink file.
    • http sink (aliases: “es”, “elasticsearch”)
      • api: api to submit sink metrics.
      • method: http method, “post” default.
    • mongo sink
      • url: url of mongo db.
      • database: database name.
      • collection: collection name.
    • custom sink
      • class: class name for user-provided data sink implementation it should be implementing org.apache.griffin.measure.sink.Sink trait and have static method with signature def apply(ctx: SinkContext): Sink. User-provided data sink should be present in Spark job's class path, by providing custom jar as -jar parameter to spark-submit or by adding to “jars” list in sparkProperties.json.

Griffin Checkpoint

  • type: Griffin checkpoint type, “zk” for zookeeper checkpoint.
  • config: Configure parameters of griffin checkpoint type.
    • zookeeper checkpoint
      • hosts: zookeeper hosts list as a string, separated by comma.
      • namespace: namespace of cache info, "" as default.
      • lock.path: path of lock info, “lock” as default.
      • mode: create mode of zookeeper node, “persist” as default.
      • init.clear: clear cache info when initialize, true default.
      • close.clear: clear cache info when close connection, false default.

DQ Job Parameters

{
  "name": "accu_batch",

  "process.type": "BATCH",

  "data.sources": [
    {
      "name": "src",
      "connectors": [
        {
          "type": "AVRO",
          "version": "1.7",
          "config": {
          	"file.path": "<path>/<to>",
            "file.name": "<source-file>.avro"
          }
        }
      ]
    }, {
      "name": "tgt",
      "connectors": [
        {
          "type": "AVRO",
          "version": "1.7",
          "config": {
          	"file.path": "<path>/<to>",
            "file.name": "<target-file>.avro"
          }
        }
      ]
    }
  ],

  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "ACCURACY",
        "out.dataframe.name": "accu",
        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
        "details": {
          "source": "source",
          "target": "target",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },        
        "out": [
          {
            "type": "metric",
            "name": "accu"
          },
          {
            "type": "record"
          }        
        ]
      }
    ]
  },
  
  "sinks": ["CONSOLE", "HTTP", "HDFS"]
}

Above lists DQ job configure parameters.

  • name: Name of DQ job.
  • process.type: Process type of DQ job, “BATCH” or “STREAMING”.
  • data.sources: List of data sources in this DQ job.
    • name: Name of this data source, it should be different from other data sources.
    • connectors: List of data connectors combined as the same data source. Details of data connector configuration here.
  • evaluate.rule: Evaluate rule parameters of this DQ job.
    • dsl.type: Default dsl type of all the rules.
    • rules: List of rules, to define every rule step. Details of rule configuration here.
  • sinks: Whitelisted sink types for this job. Note: no sinks will be used, if empty or omitted.

Data Connector

  • type: Data connector type: “AVRO”, “HIVE”, “TEXT-DIR”, “CUSTOM” for batch mode; “KAFKA”, “CUSTOM” for streaming mode.
  • version: Version string of data connector type.
  • config: Configure parameters of each data connector type.
    • avro data connector
      • file.path: avro file path, optional, "" as default.
      • file.name: avro file name.
    • hive data connector
      • database: data base name, optional, “default” as default.
      • table.name: table name.
      • where: where conditions string, split by “,”, optional. e.g. dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412 AND hour=15
    • text dir data connector
      • dir.path: parent directory path.
      • data.dir.depth: integer, depth of data directories, 0 as default.
      • success.file: success file name,
      • done.file:
    • custom connector
      • class: class name for user-provided data connector implementation. For Batch it should be implementing BatchDataConnector trait and have static method with signature def apply(ctx: BatchDataConnectorContext): BatchDataConnector. For Streaming, it should be implementing StreamingDataConnector and have static method def apply(ctx: StreamingDataConnectorContext): StreamingDataConnector. User-provided data connector should be present in Spark job's class path, by providing custom jar as -jar parameter to spark-submit or by adding to “jars” list in sparkProperties.json.

Rule

  • dsl.type: Rule dsl type, “spark-sql”, “df-ops” and “griffin-dsl”.
  • dq.type: DQ type of this rule, only for “griffin-dsl” type. Supported types: “ACCURACY”, “PROFILING”, “TIMELINESS”, “UNIQUENESS”, “COMPLETENESS”.
  • out.dataframe.name (step information): Output table name of this rule, could be used in the following rules.
  • in.dataframe.name (step information): Input table name of this rule, only used for “df-ops” type.
  • rule: The rule string.
  • details: Details of this rule, optional.
    • accuracy dq type detail configuration
      • source: the data source name which as source in accuracy, default is the name of first data source in “data.sources” if not configured.
      • target: the data source name which as target in accuracy, default is the name of second data source in “data.sources” if not configured.
      • miss: the miss count name in metric, optional.
      • total: the total count name in metric, optional.
      • matched: the matched count name in metric, optional.
    • profiling dq type detail configuration
      • source: the data source name which as source in profiling, default is the name of first data source in “data.sources” if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
    • distinctness dq type detail configuration
      • source: name of data source to measure uniqueness.
      • target: name of data source to compare with. It is always the same as source, or more than source.
      • distinct: the unique count name in metric, optional.
      • total: the total count name in metric, optional.
      • dup: the duplicate count name in metric, optional.
      • accu_dup: the accumulate duplicate count name in metric, optional, only in streaming mode and “with.accumulate” enabled.
      • num: the duplicate number name in metric, optional.
      • duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
      • with.accumulate: optional, default is true, if set as false, in streaming mode, the data set will not compare with old data to check distinctness.
    • uniqueness dq type detail configuration
      • source: name of data source to measure uniqueness.
      • target: name of data source to compare with. It is always the same as source, or more than source.
      • unique: the unique count name in metric, optional.
      • total: the total count name in metric, optional.
      • dup: the duplicate count name in metric, optional.
      • num: the duplicate number name in metric, optional.
      • duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
    • completeness dq type detail configuration
      • source: name of data source to measure completeness.
      • total: name of data source to compare with. It is always the same as source, or more than source.
      • complete: the column name in metric, optional. The number of not null values.
      • incomplete: the column name in metric, optional. The number of null values.
    • timeliness dq type detail configuration
      • source: name of data source to measure timeliness.
      • latency: the latency column name in metric, optional.
      • total: column name, optional.
      • avg: column name, optional. The average latency.
      • step: column nmae, optional. The histogram where “bin” is step=floor(latency/step.size).
      • count: column name, optional. The number of the same latencies in the concrete step.
      • percentile: column name, optional.
      • threshold: optional, if set as a time string like “1h”, the items with latency more than 1 hour will be record.
      • step.size: optional, used to build the histogram of latencies, in milliseconds (ex. “100”).
      • percentile.values: optional, used to compute the percentile metrics, values between 0 and 1. For instance, We can see fastest and slowest latencies if set [0.1, 0.9].
  • cache: Cache output dataframe. Optional, valid only for “spark-sql” and “df-ops” mode. Defaults to false if not specified.
  • out: List of output sinks for the job.
    • Metric output.
      • type: “metric”
      • name: Metric name, semantics depends on “flatten” field value.
      • flatten: Aggregation method used before sending data frame result into the sink:
        • default: use “array” if data frame returned multiple records, otherwise use “entries”
        • entries: sends first row of data frame as metric results, like like {"agg_col": "value"}
        • array: wraps all metrics into a map, like {"my_out_name": [{"agg_col": "value"}]}
        • map: wraps first row of data frame into a map, like {"my_out_name": {"agg_col": "value"}}
    • Record output. Currently handled only by HDFS sink.
      • type: “record”
      • name: File name within sink output folder to dump files to.
    • Data source cache update for streaming jobs.
      • type: “dsc-update”
      • name: Data source name to update cache.