This recipe is intended to demonstrate how task dependencies can be modeled using primitives provided by Helix. A given task can be run with desired parallelism and will start only when up-stream dependencies are met. The demo executes the task DAG described below using 10 workers. Although the demo starts the workers as threads, there is no requirement that all the workers need to run in the same process. In reality, these workers run on many different boxes on a cluster. When worker fails, Helix takes care of re-assigning a failed task partition to a new worker.
Redis is used as a result store. Any other suitable implementation for TaskResultStore can be plugged in.
10000 impression events and around 100 click events are pre-populated in task result store (redis).
ImpEvent: format: id,isFraudulent,country,gender
ClickEvent: format: id,isFraudulent,impEventId
FilterImps: Filters impression where isFraudulent=true.
FilterClicks: Filters clicks where isFraudulent=true
impCountsByGender: Generates impression counts grouped by gender. It does this by incrementing the count for ‘impression_gender_counts:<gender_value>’ in the task result store (redis hash). Depends on: FilterImps
impCountsByCountry: Generates impression counts grouped by country. It does this by incrementing the count for ‘impression_country_counts:<country_value>’ in the task result store (redis hash). Depends on: FilterClicks
impClickJoin: Joins clicks with corresponding impression event using impEventId as the join key. Join is needed to pull dimensions not present in click event. Depends on: FilterImps, FilterClicks
clickCountsByGender: Generates click counts grouped by gender. It does this by incrementing the count for click_gender_counts:<gender_value> in the task result store (redis hash). Depends on: impClickJoin
clickCountsByGender: Generates click counts grouped by country. It does this by incrementing the count for click_country_counts:<country_value> in the task result store (redis hash). Depends on: impClickJoin
report: Reads from all aggregates generated by previous stages and prints them. Depends on: impCountsByGender, impCountsByCountry, clickCountsByGender,clickCountsByGender
Each stage is represented as a Node along with the upstream dependency and desired parallelism. Each stage is modelled as a resource in Helix using OnlineOffline state model. As part of Offline to Online transition, we watch the external view of upstream resources and wait for them to transition to online state. See Task.java for additional info.
Dag dag = new Dag(); dag.addNode(new Node("filterImps", 10, "")); dag.addNode(new Node("filterClicks", 5, "")); dag.addNode(new Node("impClickJoin", 10, "filterImps,filterClicks")); dag.addNode(new Node("impCountsByGender", 10, "filterImps")); dag.addNode(new Node("impCountsByCountry", 10, "filterImps")); dag.addNode(new Node("clickCountsByGender", 5, "impClickJoin")); dag.addNode(new Node("clickCountsByCountry", 5, "impClickJoin")); dag.addNode(new Node("report",1,"impCountsByGender,impCountsByCountry,clickCountsByGender,clickCountsByCountry"));
In order to run the demo, use the following steps
See http://redis.io/topics/quickstart on how to install redis server
Start redis e.g: ./redis-server --port 6379 git clone https://git-wip-us.apache.org/repos/asf/helix.git cd recipes/task-execution mvn clean install package -DskipTests cd target/task-execution-pkg/bin chmod +x task-execution-demo.sh ./task-execution-demo.sh 2181 localhost 6379
+-----------------+ +----------------+ | filterImps | | filterClicks | | (parallelism=10)| | (parallelism=5)| +----------+-----++ +-------+--------+ | | | | | | | | | | | | | | +------->--------v------------+ +--------------<-+ +------v-------+ | impClickJoin | |impCountsByGender |impCountsByCountry | (parallelism=10) | |(parallelism=10) |(parallelism=10) ++-------------------+-+ +-----------+--+ +---+----------+ | | | | | | | | | | | | +--------v---------+ +-v-------------------+ | | |clickCountsByGender |clickCountsByCountry | | | |(parallelism=5) | |(parallelism=5) | | | +----+-------------+ +---------------------+ | | | | | | | | | | | | +----->+-----+>-----------v----+<---------------+ | report | |(parallelism=1) | +-----------------------+
(credit for above ascii art: http://www.asciiflow.com)
Done populating dummy data Executing filter task for filterImps_3 for impressions_demo Executing filter task for filterImps_2 for impressions_demo Executing filter task for filterImps_0 for impressions_demo Executing filter task for filterImps_1 for impressions_demo Executing filter task for filterImps_4 for impressions_demo Executing filter task for filterClicks_3 for clicks_demo Executing filter task for filterClicks_1 for clicks_demo Executing filter task for filterImps_8 for impressions_demo Executing filter task for filterImps_6 for impressions_demo Executing filter task for filterClicks_2 for clicks_demo Executing filter task for filterClicks_0 for clicks_demo Executing filter task for filterImps_7 for impressions_demo Executing filter task for filterImps_5 for impressions_demo Executing filter task for filterClicks_4 for clicks_demo Executing filter task for filterImps_9 for impressions_demo Running AggTask for impCountsByGender_3 for filtered_impressions_demo gender Running AggTask for impCountsByGender_2 for filtered_impressions_demo gender Running AggTask for impCountsByGender_0 for filtered_impressions_demo gender Running AggTask for impCountsByGender_9 for filtered_impressions_demo gender Running AggTask for impCountsByGender_1 for filtered_impressions_demo gender Running AggTask for impCountsByGender_4 for filtered_impressions_demo gender Running AggTask for impCountsByCountry_4 for filtered_impressions_demo country Running AggTask for impCountsByGender_5 for filtered_impressions_demo gender Executing JoinTask for impClickJoin_2 Running AggTask for impCountsByCountry_3 for filtered_impressions_demo country Running AggTask for impCountsByCountry_1 for filtered_impressions_demo country Running AggTask for impCountsByCountry_0 for filtered_impressions_demo country Running AggTask for impCountsByCountry_2 for filtered_impressions_demo country Running AggTask for impCountsByGender_6 for filtered_impressions_demo gender Executing JoinTask for impClickJoin_1 Executing JoinTask for impClickJoin_0 Executing JoinTask for impClickJoin_3 Running AggTask for impCountsByGender_8 for filtered_impressions_demo gender Executing JoinTask for impClickJoin_4 Running AggTask for impCountsByGender_7 for filtered_impressions_demo gender Running AggTask for impCountsByCountry_5 for filtered_impressions_demo country Running AggTask for impCountsByCountry_6 for filtered_impressions_demo country Executing JoinTask for impClickJoin_9 Running AggTask for impCountsByCountry_8 for filtered_impressions_demo country Running AggTask for impCountsByCountry_7 for filtered_impressions_demo country Executing JoinTask for impClickJoin_5 Executing JoinTask for impClickJoin_6 Running AggTask for impCountsByCountry_9 for filtered_impressions_demo country Executing JoinTask for impClickJoin_8 Executing JoinTask for impClickJoin_7 Running AggTask for clickCountsByCountry_1 for joined_clicks_demo country Running AggTask for clickCountsByCountry_0 for joined_clicks_demo country Running AggTask for clickCountsByCountry_2 for joined_clicks_demo country Running AggTask for clickCountsByCountry_3 for joined_clicks_demo country Running AggTask for clickCountsByGender_1 for joined_clicks_demo gender Running AggTask for clickCountsByCountry_4 for joined_clicks_demo country Running AggTask for clickCountsByGender_3 for joined_clicks_demo gender Running AggTask for clickCountsByGender_2 for joined_clicks_demo gender Running AggTask for clickCountsByGender_4 for joined_clicks_demo gender Running AggTask for clickCountsByGender_0 for joined_clicks_demo gender Running reports task Impression counts per country {CANADA=1940, US=1958, CHINA=2014, UNKNOWN=2022, UK=1946} Click counts per country {US=24, CANADA=14, CHINA=26, UNKNOWN=14, UK=22} Impression counts per gender {F=3325, UNKNOWN=3259, M=3296} Click counts per gender {F=33, UNKNOWN=32, M=35}