| I"L"<h2 id="user-story">User Story</h2> |
| <p>Say we have two streaming data sets in different kafka topics(source, target), we need to know what is the data quality for target data set, based on source data set.</p> |
| |
| <p>For simplicity, suppose both two topics’ data are json string which would be like this:</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"} |
| {"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"} |
| ... |
| </code></pre></div></div> |
| |
| <h2 id="environment-preparation">Environment Preparation</h2> |
| <p>You need to prepare the environment for Apache Griffin measure module, including the following software:</p> |
| <ul> |
| <li>JDK (1.8+)</li> |
| <li>Hadoop (2.6.0+)</li> |
| <li>Spark (2.2.1+)</li> |
| <li>Kafka (0.8.x)</li> |
| <li>Zookeeper (3.5+)</li> |
| </ul> |
| |
| <h2 id="build-apache-griffin-measure-module">Build Apache Griffin Measure Module</h2> |
| <ol> |
| <li>Download Apache Griffin source package <a href="https://www.apache.org/dist/griffin/0.4.0/">here</a>.</li> |
| <li>Unzip the source package. |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>unzip griffin-0.4.0-source-release.zip |
| cd griffin-0.4.0-source-release |
| </code></pre></div> </div> |
| </li> |
| <li>Build Apache Griffin jars. |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>mvn clean install |
| </code></pre></div> </div> |
| |
| <p>Move the built apache griffin measure jar to your work path.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>mv measure/target/measure-0.4.0.jar <work path>/griffin-measure.jar |
| </code></pre></div> </div> |
| </li> |
| </ol> |
| |
| <h2 id="data-preparation">Data Preparation</h2> |
| |
| <p>For our quick start, We will create two kafka topics(source, target) and generate data in json string format for them minutely.</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code># create topics |
| # Note: it just works for kafka 0.8 |
| kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source |
| kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target |
| </code></pre></div></div> |
| <p>The data would be generated like this:</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"} |
| {"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"} |
| </code></pre></div></div> |
| <p>For topic source and target, there could be some different items between each other. |
| You can download <a href="/data/streaming">demo data</a> and execute <code class="language-plaintext highlighter-rouge">./streaming-data.sh</code> to generate json string data file and produce them into kafka topics minutely.</p> |
| |
| <h2 id="define-data-quality-measure">Define data quality measure</h2> |
| |
| <h4 id="apache-griffin-env-configuration">Apache Griffin env configuration</h4> |
| <p>The environment config file: env.json</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{ |
| "spark": { |
| "log.level": "WARN", |
| "checkpoint.dir": "hdfs:///griffin/checkpoint", |
| "batch.interval": "20s", |
| "process.interval": "1m", |
| "init.clear": true, |
| "config": { |
| "spark.default.parallelism": 4, |
| "spark.task.maxFailures": 5, |
| "spark.streaming.kafkaMaxRatePerPartition": 1000, |
| "spark.streaming.concurrentJobs": 4, |
| "spark.yarn.maxAppAttempts": 5, |
| "spark.yarn.am.attemptFailuresValidityInterval": "1h", |
| "spark.yarn.max.executor.failures": 120, |
| "spark.yarn.executor.failuresValidityInterval": "1h", |
| "spark.hadoop.fs.hdfs.impl.disable.cache": true |
| } |
| }, |
| "sinks": [ |
| { |
| "type": "console" |
| }, |
| { |
| "type": "hdfs", |
| "config": { |
| "path": "hdfs:///griffin/persist" |
| } |
| }, |
| { |
| "type": "elasticsearch", |
| "config": { |
| "method": "post", |
| "api": "http://es:9200/griffin/accuracy" |
| } |
| } |
| ], |
| "griffin.checkpoint": [ |
| { |
| "type": "zk", |
| "config": { |
| "hosts": "zk:2181", |
| "namespace": "griffin/infocache", |
| "lock.path": "lock", |
| "mode": "persist", |
| "init.clear": true, |
| "close.clear": false |
| } |
| } |
| ] |
| } |
| </code></pre></div></div> |
| |
| <h4 id="define-griffin-data-quality">Define griffin data quality</h4> |
| <p>The DQ config file: dq.json</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{ |
| "name": "streaming_accu", |
| "process.type": "streaming", |
| "data.sources": [ |
| { |
| "name": "src", |
| "baseline": true, |
| "connectors": [ |
| { |
| "type": "kafka", |
| "version": "0.8", |
| "config": { |
| "kafka.config": { |
| "bootstrap.servers": "kafka:9092", |
| "group.id": "griffin", |
| "auto.offset.reset": "largest", |
| "auto.commit.enable": "false" |
| }, |
| "topics": "source", |
| "key.type": "java.lang.String", |
| "value.type": "java.lang.String" |
| }, |
| "pre.proc": [ |
| { |
| "dsl.type": "df-opr", |
| "rule": "from_json" |
| } |
| ] |
| } |
| ], |
| "checkpoint": { |
| "type": "json", |
| "file.path": "hdfs:///griffin/streaming/dump/source", |
| "info.path": "source", |
| "ready.time.interval": "10s", |
| "ready.time.delay": "0", |
| "time.range": ["-5m", "0"], |
| "updatable": true |
| } |
| }, { |
| "name": "tgt", |
| "connectors": [ |
| { |
| "type": "kafka", |
| "version": "0.8", |
| "config": { |
| "kafka.config": { |
| "bootstrap.servers": "kafka:9092", |
| "group.id": "griffin", |
| "auto.offset.reset": "largest", |
| "auto.commit.enable": "false" |
| }, |
| "topics": "target", |
| "key.type": "java.lang.String", |
| "value.type": "java.lang.String" |
| }, |
| "pre.proc": [ |
| { |
| "dsl.type": "df-opr", |
| "rule": "from_json" |
| } |
| ] |
| } |
| ], |
| "checkpoint": { |
| "type": "json", |
| "file.path": "hdfs:///griffin/streaming/dump/target", |
| "info.path": "target", |
| "ready.time.interval": "10s", |
| "ready.time.delay": "0", |
| "time.range": ["-1m", "0"] |
| } |
| } |
| ], |
| "evaluate.rule": { |
| "rules": [ |
| { |
| "dsl.type": "griffin-dsl", |
| "dq.type": "accuracy", |
| "out.dataframe.name": "accu", |
| "rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time", |
| "details": { |
| "source": "src", |
| "target": "tgt", |
| "miss": "miss_count", |
| "total": "total_count", |
| "matched": "matched_count" |
| }, |
| "out":[ |
| { |
| "type":"metric", |
| "name": "accu" |
| }, |
| { |
| "type":"record", |
| "name": "missRecords" |
| } |
| ] |
| } |
| ] |
| }, |
| "sinks": ["CONSOLE", "HDFS"] |
| } |
| </code></pre></div></div> |
| |
| <h2 id="measure-data-quality">Measure data quality</h2> |
| <p>Submit the measure job to Spark, with config file paths as parameters.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \ |
| --driver-memory 1g --executor-memory 1g --num-executors 3 \ |
| <path>/griffin-measure.jar \ |
| <path>/env.json <path>/dq.json |
| </code></pre></div></div> |
| |
| <h2 id="report-data-quality-metrics">Report data quality metrics</h2> |
| <p>Then you can get the calculation log in console, when the job runs, you can get the result metrics printed minutely. The related results will also be saved in hdfs: <code class="language-plaintext highlighter-rouge">hdfs:///griffin/persist/<job name>/</code>, listing in different directoies named by calculate timestamps.</p> |
| |
| <h2 id="refine-data-quality-report">Refine Data Quality report</h2> |
| <p>Depends on your business, you might need to refine your data quality measure further till your are satisfied.</p> |
| |
| <h2 id="more-details">More Details</h2> |
| <p>For more details about apache griffin measures, you can visit our documents in <a href="https://github.com/apache/griffin/tree/master/griffin-doc">github</a>.</p> |
| |
| :ET |