blob: cba45bec4563afe29961ca7b4064e2dae3182d75 [file] [log] [blame]
I"L"<h2 id="user-story">User Story</h2>
<p>Say we have two streaming data sets in different kafka topics(source, target), we need to know what is the data quality for target data set, based on source data set.</p>
<p>For simplicity, suppose both two topics data are json string which would be like this:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
...
</code></pre></div></div>
<h2 id="environment-preparation">Environment Preparation</h2>
<p>You need to prepare the environment for Apache Griffin measure module, including the following software:</p>
<ul>
<li>JDK (1.8+)</li>
<li>Hadoop (2.6.0+)</li>
<li>Spark (2.2.1+)</li>
<li>Kafka (0.8.x)</li>
<li>Zookeeper (3.5+)</li>
</ul>
<h2 id="build-apache-griffin-measure-module">Build Apache Griffin Measure Module</h2>
<ol>
<li>Download Apache Griffin source package <a href="https://www.apache.org/dist/griffin/0.4.0/">here</a>.</li>
<li>Unzip the source package.
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>unzip griffin-0.4.0-source-release.zip
cd griffin-0.4.0-source-release
</code></pre></div> </div>
</li>
<li>Build Apache Griffin jars.
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>mvn clean install
</code></pre></div> </div>
<p>Move the built apache griffin measure jar to your work path.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>mv measure/target/measure-0.4.0.jar &lt;work path&gt;/griffin-measure.jar
</code></pre></div> </div>
</li>
</ol>
<h2 id="data-preparation">Data Preparation</h2>
<p>For our quick start, We will create two kafka topics(source, target) and generate data in json string format for them minutely.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code># create topics
# Note: it just works for kafka 0.8
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target
</code></pre></div></div>
<p>The data would be generated like this:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"id": 1, "name": "Apple", "color": "red", "time": "2018-09-12_06:00:00"}
{"id": 2, "name": "Banana", "color": "yellow", "time": "2018-09-12_06:01:00"}
</code></pre></div></div>
<p>For topic source and target, there could be some different items between each other.
You can download <a href="/data/streaming">demo data</a> and execute <code class="language-plaintext highlighter-rouge">./streaming-data.sh</code> to generate json string data file and produce them into kafka topics minutely.</p>
<h2 id="define-data-quality-measure">Define data quality measure</h2>
<h4 id="apache-griffin-env-configuration">Apache Griffin env configuration</h4>
<p>The environment config file: env.json</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{
"spark": {
"log.level": "WARN",
"checkpoint.dir": "hdfs:///griffin/checkpoint",
"batch.interval": "20s",
"process.interval": "1m",
"init.clear": true,
"config": {
"spark.default.parallelism": 4,
"spark.task.maxFailures": 5,
"spark.streaming.kafkaMaxRatePerPartition": 1000,
"spark.streaming.concurrentJobs": 4,
"spark.yarn.maxAppAttempts": 5,
"spark.yarn.am.attemptFailuresValidityInterval": "1h",
"spark.yarn.max.executor.failures": 120,
"spark.yarn.executor.failuresValidityInterval": "1h",
"spark.hadoop.fs.hdfs.impl.disable.cache": true
}
},
"sinks": [
{
"type": "console"
},
{
"type": "hdfs",
"config": {
"path": "hdfs:///griffin/persist"
}
},
{
"type": "elasticsearch",
"config": {
"method": "post",
"api": "http://es:9200/griffin/accuracy"
}
}
],
"griffin.checkpoint": [
{
"type": "zk",
"config": {
"hosts": "zk:2181",
"namespace": "griffin/infocache",
"lock.path": "lock",
"mode": "persist",
"init.clear": true,
"close.clear": false
}
}
]
}
</code></pre></div></div>
<h4 id="define-griffin-data-quality">Define griffin data quality</h4>
<p>The DQ config file: dq.json</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{
"name": "streaming_accu",
"process.type": "streaming",
"data.sources": [
{
"name": "src",
"baseline": true,
"connectors": [
{
"type": "kafka",
"version": "0.8",
"config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092",
"group.id": "griffin",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"topics": "source",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
"pre.proc": [
{
"dsl.type": "df-opr",
"rule": "from_json"
}
]
}
],
"checkpoint": {
"type": "json",
"file.path": "hdfs:///griffin/streaming/dump/source",
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-5m", "0"],
"updatable": true
}
}, {
"name": "tgt",
"connectors": [
{
"type": "kafka",
"version": "0.8",
"config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092",
"group.id": "griffin",
"auto.offset.reset": "largest",
"auto.commit.enable": "false"
},
"topics": "target",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
"pre.proc": [
{
"dsl.type": "df-opr",
"rule": "from_json"
}
]
}
],
"checkpoint": {
"type": "json",
"file.path": "hdfs:///griffin/streaming/dump/target",
"info.path": "target",
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-1m", "0"]
}
}
],
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "accuracy",
"out.dataframe.name": "accu",
"rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time",
"details": {
"source": "src",
"target": "tgt",
"miss": "miss_count",
"total": "total_count",
"matched": "matched_count"
},
"out":[
{
"type":"metric",
"name": "accu"
},
{
"type":"record",
"name": "missRecords"
}
]
}
]
},
"sinks": ["CONSOLE", "HDFS"]
}
</code></pre></div></div>
<h2 id="measure-data-quality">Measure data quality</h2>
<p>Submit the measure job to Spark, with config file paths as parameters.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
&lt;path&gt;/griffin-measure.jar \
&lt;path&gt;/env.json &lt;path&gt;/dq.json
</code></pre></div></div>
<h2 id="report-data-quality-metrics">Report data quality metrics</h2>
<p>Then you can get the calculation log in console, when the job runs, you can get the result metrics printed minutely. The related results will also be saved in hdfs: <code class="language-plaintext highlighter-rouge">hdfs:///griffin/persist/&lt;job name&gt;/</code>, listing in different directoies named by calculate timestamps.</p>
<h2 id="refine-data-quality-report">Refine Data Quality report</h2>
<p>Depends on your business, you might need to refine your data quality measure further till your are satisfied.</p>
<h2 id="more-details">More Details</h2>
<p>For more details about apache griffin measures, you can visit our documents in <a href="https://github.com/apache/griffin/tree/master/griffin-doc">github</a>.</p>
:ET