| --- |
| title: Apache DataFu Hourglass - Getting Started |
| section_name: Getting Started |
| version: 0.1.3 |
| license: > |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --- |
| |
| # DataFu Hourglass |
| |
| Hourglass is Apache DataFu's solution to incrementally processing data with Hadoop MapReduce. It is designed to make computations over |
| sliding windows more efficient. |
| |
| A typical example of a sliding window is a dashboard that shows the number of visitors to every page on the site over the last thirty days. |
| To keep this dashboard up to date, we can schedule a query that runs daily and gathers the stats for the last 30 days. |
| However, this simple implementation would be wasteful: only one day of data has changed, but we'd be consuming and recalculating |
| the stats for all 30. A more efficient solution is to make the query incremental: using basic arithmetic, we can update the output |
| from the previous day by adding and subtracting input data. This enables the job to process only the new data, significantly reducing |
| the computational resources required. |
| |
| Hourglass is a framework that makes it much easier to write incremental Hadoop jobs to perform this type of computation efficiently. |
| It provides incremental jobs that abstract away the complexity of implementing a robust incremental solution, with the appropriate hooks |
| so that developers can supply custom logic to perform the aggregation task. |
| |
| The rest of this page assumes you already have a built JAR available. If this is not the case, please see the [Download](/docs/download.html) page. |
| |
| ## Examples |
| |
| We have provided some sample incremental jobs to demonstrate how to use Hourglass for incremental processing. |
| These jobs consume input data consisting of just a single `id` field and which is also partitioned by day according to |
| the naming convention `/data/event/yyyy/MM/dd`. An event such as this could be used to track, say, each time a user has |
| logged into the site, or each time a user has viewed a page. Given this event data there may be certain metrics we want |
| to compute daily, such as how many times each user has logged in or viewed a page. |
| |
| These sample jobs are packaged in a tool that can be run from the command line. The same tool also serves as a test data generator. |
| Here we will walk through how to generate test data and run the jobs against it. |
| |
| Build the main Hourglass JAR, build the test JAR, and copy the dependencies necessary for the demo to a single directory: |
| |
| ./gradlew :datafu-hourglass:jar :datafu-hourglass:testJar :datafu-hourglass:copyDemoDependencies |
| |
| Define some variables that we'll need for the `hadoop jar` command. These list the JAR dependencies, as well as the JARs we just built. |
| |
| export LIBJARS=$(find "datafu-hourglass/build/libs" -name '*.jar' | xargs echo | tr ' ' ',') |
| |
| export LIBJARS=$LIBJARS,$(find "datafu-hourglass/build/demo_dependencies" -name '*.jar' | xargs echo | tr ' ' ',') |
| |
| export HADOOP_CLASSPATH=`echo ${LIBJARS} | sed s/,/:/g` |
| |
| Assuming you've set up the `hadoop` command to run against your Hadoop cluster, you are now ready to run the jobs. |
| |
| Let's define some shorthand commands to run the Hourglass JAR and dump JSON from an Avro file: |
| |
| export HOURGLASS_CMD="hadoop jar datafu-hourglass/build/libs/datafu-hourglass-<%= current_page.data.version %>-tests.jar datafu.hourglass.demo.Main" |
| |
| export TO_JSON_CMD="java -jar datafu-hourglass/build/demo_dependencies/avro-tools-1.7.4.jar tojson" |
| |
| ### Counting Events |
| |
| In this example we will run a job that counts how many times each `id` value has appeared in an input data set. Then we will run the |
| job again on new data, which will incrementally update the previous result. |
| |
| First we'll generate some test data under the path`/data/event` using the `generate` command from our command line tool. |
| The command below will create some random events for dates between 2013/03/01 and 2013/03/14, inclusive. |
| Each record consists of just a single long value from the range 1-100. |
| |
| $HOURGLASS_CMD generate -libjars ${LIBJARS} /data/event 2013/03/01-2013/03/14 |
| |
| Just to get a sense for what the data looks like, we can copy it locally and dump the first several records. |
| |
| hadoop fs -copyToLocal /data/event/2013/03/01/part-00000.avro temp.avro |
| $TO_JSON_CMD temp.avro | head |
| |
| This will produce output looking something like this: |
| |
| {"id":35} |
| {"id":27} |
| {"id":78} |
| {"id":79} |
| {"id":73} |
| {"id":61} |
| {"id":94} |
| {"id":62} |
| {"id":6} |
| {"id":44} |
| |
| Now run the `countbyid` command, which executes the sample [CountById](https://github.com/apache/datafu/blob/master/datafu-hourglass/src/test/java/datafu/hourglass/demo/CountById.java) job. |
| This will count the number of events for each ID value. |
| |
| $HOURGLASS_CMD countbyid -libjars ${LIBJARS} /data/event /output |
| |
| In the console output you will notice that it reads all fourteen days of input that are available. |
| We can see what this produced by copying the output locally and dumping the first several records. |
| Each record consists of an ID and a count. |
| |
| rm temp.avro |
| hadoop fs -copyToLocal /output/20130314/part-r-00000.avro temp.avro |
| $TO_JSON_CMD temp.avro | head |
| |
| This will produce output looking something like this: |
| |
| {"key":{"member_id":1},"value":{"count":162}} |
| {"key":{"member_id":2},"value":{"count":136}} |
| {"key":{"member_id":3},"value":{"count":137}} |
| {"key":{"member_id":4},"value":{"count":142}} |
| {"key":{"member_id":5},"value":{"count":149}} |
| {"key":{"member_id":6},"value":{"count":145}} |
| {"key":{"member_id":7},"value":{"count":131}} |
| {"key":{"member_id":8},"value":{"count":143}} |
| {"key":{"member_id":9},"value":{"count":138}} |
| {"key":{"member_id":10},"value":{"count":160}} |
| |
| Now let's generate an additional day of data, for 2013/03/15: |
| |
| $HOURGLASS_CMD generate -libjars ${LIBJARS} /data/event 2013/03/15 |
| |
| The job is configured to consume all available input data. But since a previous output already exists, |
| it is able to reuse this result and therefore it only needs to consume the previous output and the new |
| day of input. Let's run the incremental job again: |
| |
| $HOURGLASS_CMD countbyid -libjars ${LIBJARS} /data/event /output |
| |
| You'll notice in console output that the job considers two alternative plans. In one version it consumes |
| all available input data to produce the new output. In the other version it reuses the previous output |
| and merges this with only the new input. The second version requires fewer bytes to be consumed, so the job |
| picks this one. |
| |
| We can download the new output and inspect the counts: |
| |
| rm temp.avro |
| hadoop fs -copyToLocal /output/20130315/part-r-00000.avro temp.avro |
| $TO_JSON_CMD temp.avro | head |
| |
| The implementation of the `CountById` job can be found [here](https://github.com/apache/datafu/blob/master/datafu-hourglass/src/test/java/datafu/hourglass/demo/CountById.java). |
| A more detailed explanation of how the job works and how it is implemented can be found in our |
| [blog post](/blog/2013/10/03/datafus-hourglass-incremental-data-processing-in-hadoop.html). |
| |
| ### Estimating Cardinality |
| |
| Continuing from the previous example, suppose we wanted to know the number of distinct IDs over the past fifteen |
| days. In this example we will run a sequence of jobs that use the HyperLogLog cardinality estimation algorithm to compute this |
| result incrementally. The first job estimates the cardinality of IDs per day and outputs these estimates in directories |
| partitioned by day. Each output also includes the serialized cardinality estimator so that the state can be reconstructed. |
| The second job reads the fifteen days of intermediate data, merges the estimators together, and then produces the final estimate |
| over the fifteen days. The advantage of this over other approaches is that when a new day arrives, only that day of input needs to |
| be processed. Once the estimate for that day is produced, the second job can combine it with the other estimates computed previously. |
| |
| Let's start by cleaning up the output directory: |
| |
| hadoop fs -rmr /output |
| |
| If you have been following along from the previous example, you already have fifteen days of input data available, |
| so we don't need to regenerate it. We can run the `cardinality` command to execute the two jobs. This executes |
| the sample jobs in [EstimateCardinality](https://github.com/apache/datafu/blob/master/datafu-hourglass/src/test/java/datafu/hourglass/demo/EstimateCardinality.java). |
| |
| $HOURGLASS_CMD cardinality -libjars ${LIBJARS} /data/event /intermediate /output 15 |
| |
| You will notice in the console output that the job consumes fifteen days of input. We can then inspect the output to |
| see the count of distinct IDs. Note that the output record consists of the count *and* the serialized HyperLogLog estimator, |
| so we use `grep` to return just the count. |
| |
| rm temp.avro |
| hadoop fs -copyToLocal /output/20130315/part-r-00000.avro temp.avro |
| $TO_JSON_CMD temp.avro | grep -E -oh "\"count\":[0-9]+" |
| |
| As the IDs in the test data are generated from the range 1-100, this produces the expected output of 100. |
| We'll add a new day of data, but this time we'll use the range 101-200. |
| |
| $HOURGLASS_CMD generate -libjars ${LIBJARS} /data/event 2013/03/16 101-200 |
| |
| Now we'll run the job again. It automatically consumes fifteen days of data ending with the most recent data that's |
| available. |
| |
| $HOURGLASS_CMD cardinality -libjars ${LIBJARS} /data/event /intermediate /output 15 |
| |
| We can now inspect the output again: |
| |
| rm temp.avro |
| hadoop fs -copyToLocal /output/20130316/part-r-00000.avro temp.avro |
| $TO_JSON_CMD temp.avro | grep -E -oh "\"count\":[0-9]+" |
| |
| This produces the expected result of 200. |
| |
| The implementation of the `EstimateCardinality` job can be found [here](https://github.com/apache/datafu/blob/master/datafu-hourglass/src/test/java/datafu/hourglass/demo/EstimateCardinality.java). |
| A more detailed explanation of how the job works and how it is implemented can be found in our |
| [blog post](/blog/2013/10/03/datafus-hourglass-incremental-data-processing-in-hadoop.html). |
| |
| ## Next Steps |
| |
| Check out [Concepts](/docs/hourglass/concepts.html) for more details on developing jobs with Hourglass. |