| ---++ Contents |
| * <a href="#Onboarding Steps">Onboarding Steps</a> |
| * <a href="#Sample Pipeline">Sample Pipeline</a> |
| * [[HiveIntegration][Hive Examples]] |
| |
| ---+++ Onboarding Steps |
| * Create cluster definition for the cluster, specifying name node, job tracker, workflow engine endpoint, messaging endpoint. Refer to [[EntitySpecification][cluster definition]] for details. |
| * Create Feed definitions for each of the input and output specifying frequency, data path, ownership. Refer to [[EntitySpecification][feed definition]] for details. |
| * Create Process definition for your job. Process defines configuration for the workflow job. Important attributes are frequency, inputs/outputs and workflow path. Refer to [[EntitySpecification][process definition]] for process details. |
| * Define workflow for your job using the workflow engine(only oozie is supported as of now). Refer [[http://oozie.apache.org/docs/3.1.3-incubating/WorkflowFunctionalSpec.html][Oozie Workflow Specification]]. The libraries required for the workflow should be available in lib folder in workflow path. |
| * Set-up workflow definition, libraries and referenced scripts on hadoop. |
| * Submit cluster definition |
| * Submit and schedule feed and process definitions |
| |
| |
| ---+++ Sample Pipeline |
| ---++++ Cluster |
| Cluster definition that contains end points for name node, job tracker, oozie and jms server: |
| The cluster locations MUST be created prior to submitting a cluster entity to Falcon. |
| *staging* must have 777 permissions and the parent dirs must have execute permissions |
| *working* must have 755 permissions and the parent dirs must have execute permissions |
| |
| <verbatim> |
| <?xml version="1.0"?> |
| <!-- |
| Cluster configuration |
| --> |
| <cluster colo="ua2" description="" name="corp" xmlns="uri:falcon:cluster:0.1" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <interfaces> |
| <interface type="readonly" endpoint="hftp://name-node.com:50070" version="2.5.0" /> |
| |
| <interface type="write" endpoint="hdfs://name-node.com:54310" version="2.5.0" /> |
| |
| <interface type="execute" endpoint="job-tracker:54311" version="2.5.0" /> |
| |
| <interface type="workflow" endpoint="http://oozie.com:11000/oozie/" version="4.0.1" /> |
| |
| <interface type="messaging" endpoint="tcp://jms-server.com:61616?daemon=true" version="5.1.6" /> |
| </interfaces> |
| |
| <locations> |
| <location name="staging" path="/projects/falcon/staging" /> |
| <location name="temp" path="/tmp" /> |
| <location name="working" path="/projects/falcon/working" /> |
| </locations> |
| </cluster> |
| </verbatim> |
| |
| ---++++ Input Feed |
| Hourly feed that defines feed path, frequency, ownership and validity: |
| <verbatim> |
| <?xml version="1.0" encoding="UTF-8"?> |
| <!-- |
| Hourly sample input data |
| --> |
| |
| <feed description="sample input data" name="SampleInput" xmlns="uri:falcon:feed:0.1" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <groups>group</groups> |
| |
| <frequency>hours(1)</frequency> |
| |
| <late-arrival cut-off="hours(6)" /> |
| |
| <clusters> |
| <cluster name="corp" type="source"> |
| <validity start="2009-01-01T00:00Z" end="2099-12-31T00:00Z" timezone="UTC" /> |
| <retention limit="months(24)" action="delete" /> |
| </cluster> |
| </clusters> |
| |
| <locations> |
| <location type="data" path="/projects/bootcamp/data/${YEAR}-${MONTH}-${DAY}-${HOUR}/SampleInput" /> |
| <location type="stats" path="/projects/bootcamp/stats/SampleInput" /> |
| <location type="meta" path="/projects/bootcamp/meta/SampleInput" /> |
| </locations> |
| |
| <ACL owner="suser" group="users" permission="0755" /> |
| |
| <schema location="/none" provider="none" /> |
| </feed> |
| </verbatim> |
| |
| ---++++ Output Feed |
| Daily feed that defines feed path, frequency, ownership and validity: |
| <verbatim> |
| <?xml version="1.0" encoding="UTF-8"?> |
| <!-- |
| Daily sample output data |
| --> |
| |
| <feed description="sample output data" name="SampleOutput" xmlns="uri:falcon:feed:0.1" |
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
| <groups>group</groups> |
| |
| <frequency>days(1)</frequency> |
| |
| <late-arrival cut-off="hours(6)" /> |
| |
| <clusters> |
| <cluster name="corp" type="source"> |
| <validity start="2009-01-01T00:00Z" end="2099-12-31T00:00Z" timezone="UTC" /> |
| <retention limit="months(24)" action="delete" /> |
| </cluster> |
| </clusters> |
| |
| <locations> |
| <location type="data" path="/projects/bootcamp/output/${YEAR}-${MONTH}-${DAY}/SampleOutput" /> |
| <location type="stats" path="/projects/bootcamp/stats/SampleOutput" /> |
| <location type="meta" path="/projects/bootcamp/meta/SampleOutput" /> |
| </locations> |
| |
| <ACL owner="suser" group="users" permission="0755" /> |
| |
| <schema location="/none" provider="none" /> |
| </feed> |
| </verbatim> |
| |
| ---++++ Process |
| Sample process which runs daily at 6th hour on corp cluster. It takes one input - !SampleInput for the previous day(24 instances). It generates one output - !SampleOutput for previous day. The workflow is defined at /projects/bootcamp/workflow/workflow.xml. Any libraries available for the workflow should be at /projects/bootcamp/workflow/lib. The process also defines properties queueName, ssh.host, and fileTimestamp which are passed to the workflow. In addition, Falcon exposes the following properties to the workflow: nameNode, jobTracker(hadoop properties), input and output(Input/Output properties). |
| |
| <verbatim> |
| <?xml version="1.0" encoding="UTF-8"?> |
| <!-- |
| Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday |
| --> |
| <process name="SampleProcess"> |
| <cluster name="corp" /> |
| |
| <frequency>days(1)</frequency> |
| |
| <validity start="2012-04-03T06:00Z" end="2022-12-30T00:00Z" timezone="UTC" /> |
| |
| <inputs> |
| <input name="input" feed="SampleInput" start="yesterday(0,0)" end="today(-1,0)" /> |
| </inputs> |
| |
| <outputs> |
| <output name="output" feed="SampleOutput" instance="yesterday(0,0)" /> |
| </outputs> |
| |
| <properties> |
| <property name="queueName" value="reports" /> |
| <property name="ssh.host" value="host.com" /> |
| <property name="fileTimestamp" value="${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}" /> |
| </properties> |
| |
| <workflow engine="oozie" path="/projects/bootcamp/workflow" /> |
| |
| <retry policy="periodic" delay="minutes(5)" attempts="3" /> |
| |
| <late-process policy="exp-backoff" delay="hours(1)"> |
| <late-input input="input" workflow-path="/projects/bootcamp/workflow/lateinput" /> |
| </late-process> |
| </process> |
| </verbatim> |
| |
| ---++++ Oozie Workflow |
| The sample user workflow contains 3 actions: |
| * Pig action - Executes pig script /projects/bootcamp/workflow/script.pig |
| * concatenator - Java action that concatenates part files and generates a single file |
| * file upload - ssh action that gets the concatenated file from hadoop and sends the file to a remote host |
| |
| <verbatim> |
| <workflow-app xmlns="uri:oozie:workflow:0.2" name="sample-wf"> |
| <start to="pig" /> |
| |
| <action name="pig"> |
| <pig> |
| <job-tracker>${jobTracker}</job-tracker> |
| <name-node>${nameNode}</name-node> |
| <prepare> |
| <delete path="${output}"/> |
| </prepare> |
| <configuration> |
| <property> |
| <name>mapred.job.queue.name</name> |
| <value>${queueName}</value> |
| </property> |
| <property> |
| <name>mapreduce.fileoutputcommitter.marksuccessfuljobs</name> |
| <value>true</value> |
| </property> |
| </configuration> |
| <script>${nameNode}/projects/bootcamp/workflow/script.pig</script> |
| <param>input=${input}</param> |
| <param>output=${output}</param> |
| <file>lib/dependent.jar</file> |
| </pig> |
| <ok to="concatenator" /> |
| <error to="fail" /> |
| </action> |
| |
| <action name="concatenator"> |
| <java> |
| <job-tracker>${jobTracker}</job-tracker> |
| <name-node>${nameNode}</name-node> |
| <prepare> |
| <delete path="${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv"/> |
| </prepare> |
| <configuration> |
| <property> |
| <name>mapred.job.queue.name</name> |
| <value>${queueName}</value> |
| </property> |
| </configuration> |
| <main-class>com.wf.Concatenator</main-class> |
| <arg>${output}</arg> |
| <arg>${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv</arg> |
| </java> |
| <ok to="fileupload" /> |
| <error to="fail"/> |
| </action> |
| |
| <action name="fileupload"> |
| <ssh> |
| <host>localhost</host> |
| <command>/tmp/fileupload.sh</command> |
| <args>${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv</args> |
| <args>${wf:conf("ssh.host")}</args> |
| <capture-output/> |
| </ssh> |
| <ok to="fileUploadDecision" /> |
| <error to="fail"/> |
| </action> |
| |
| <decision name="fileUploadDecision"> |
| <switch> |
| <case to="end"> |
| ${wf:actionData('fileupload')['output'] == '0'} |
| </case> |
| <default to="fail"/> |
| </switch> |
| </decision> |
| |
| <kill name="fail"> |
| <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> |
| </kill> |
| |
| <end name="end" /> |
| </workflow-app> |
| </verbatim> |
| |
| ---++++ File Upload Script |
| The script gets the file from hadoop, rsyncs the file to /tmp on remote host and deletes the file from hadoop |
| <verbatim> |
| #!/bin/bash |
| |
| trap 'echo "output=$?"; exit $?' ERR INT TERM |
| |
| echo "Arguments: $@" |
| SRCFILE=$1 |
| DESTHOST=$3 |
| |
| FILENAME=`basename $SRCFILE` |
| rm -f /tmp/$FILENAME |
| hadoop fs -copyToLocal $SRCFILE /tmp/ |
| echo "Copied $SRCFILE to /tmp" |
| |
| rsync -ztv --rsh=ssh --stats /tmp/$FILENAME $DESTHOST:/tmp |
| echo "rsynced $FILENAME to $DESTUSER@$DESTHOST:$DESTFILE" |
| |
| hadoop fs -rmr $SRCFILE |
| echo "Deleted $SRCFILE" |
| |
| rm -f /tmp/$FILENAME |
| echo "output=0" |
| </verbatim> |