blob: 8b0215080a651470528cca60c1490869c8b8f513 [file] [log] [blame]
---++ Contents
* <a href="#Onboarding Steps">Onboarding Steps</a>
* <a href="#Sample Pipeline">Sample Pipeline</a>
* [[HiveIntegration][Hive Examples]]
---+++ Onboarding Steps
* Create cluster definition for the cluster, specifying name node, job tracker, workflow engine endpoint, messaging endpoint. Refer to [[EntitySpecification][cluster definition]] for details.
* Create Feed definitions for each of the input and output specifying frequency, data path, ownership. Refer to [[EntitySpecification][feed definition]] for details.
* Create Process definition for your job. Process defines configuration for the workflow job. Important attributes are frequency, inputs/outputs and workflow path. Refer to [[EntitySpecification][process definition]] for process details.
* Define workflow for your job using the workflow engine(only oozie is supported as of now). Refer [[][Oozie Workflow Specification]]. The libraries required for the workflow should be available in lib folder in workflow path.
* Set-up workflow definition, libraries and referenced scripts on hadoop.
* Submit cluster definition
* Submit and schedule feed and process definitions
---+++ Sample Pipeline
---++++ Cluster
Cluster definition that contains end points for name node, job tracker, oozie and jms server:
The cluster locations MUST be created prior to submitting a cluster entity to Falcon.
*staging* must have 777 permissions and the parent dirs must have execute permissions
*working* must have 755 permissions and the parent dirs must have execute permissions
<?xml version="1.0"?>
Cluster configuration
<cluster colo="ua2" description="" name="corp" xmlns="uri:falcon:cluster:0.1"
<interface type="readonly" endpoint="h" version="2.5.0" />
<interface type="write" endpoint="hdfs://" version="2.5.0" />
<interface type="execute" endpoint="job-tracker:54311" version="2.5.0" />
<interface type="workflow" endpoint="" version="4.0.1" />
<interface type="messaging" endpoint="tcp://" version="5.1.6" />
<location name="staging" path="/projects/falcon/staging" />
<location name="temp" path="/tmp" />
<location name="working" path="/projects/falcon/working" />
---++++ Input Feed
Hourly feed that defines feed path, frequency, ownership and validity:
<?xml version="1.0" encoding="UTF-8"?>
Hourly sample input data
<feed description="sample input data" name="SampleInput" xmlns="uri:falcon:feed:0.1"
<late-arrival cut-off="hours(6)" />
<cluster name="corp" type="source">
<validity start="2009-01-01T00:00Z" end="2099-12-31T00:00Z" timezone="UTC" />
<retention limit="months(24)" action="delete" />
<location type="data" path="/projects/bootcamp/data/${YEAR}-${MONTH}-${DAY}-${HOUR}/SampleInput" />
<location type="stats" path="/projects/bootcamp/stats/SampleInput" />
<location type="meta" path="/projects/bootcamp/meta/SampleInput" />
<ACL owner="suser" group="users" permission="0755" />
<schema location="/none" provider="none" />
---++++ Output Feed
Daily feed that defines feed path, frequency, ownership and validity:
<?xml version="1.0" encoding="UTF-8"?>
Daily sample output data
<feed description="sample output data" name="SampleOutput" xmlns="uri:falcon:feed:0.1"
<late-arrival cut-off="hours(6)" />
<cluster name="corp" type="source">
<validity start="2009-01-01T00:00Z" end="2099-12-31T00:00Z" timezone="UTC" />
<retention limit="months(24)" action="delete" />
<location type="data" path="/projects/bootcamp/output/${YEAR}-${MONTH}-${DAY}/SampleOutput" />
<location type="stats" path="/projects/bootcamp/stats/SampleOutput" />
<location type="meta" path="/projects/bootcamp/meta/SampleOutput" />
<ACL owner="suser" group="users" permission="0755" />
<schema location="/none" provider="none" />
---++++ Process
Sample process which runs daily at 6th hour on corp cluster. It takes one input - !SampleInput for the previous day(24 instances). It generates one output - !SampleOutput for previous day. The workflow is defined at /projects/bootcamp/workflow/workflow.xml. Any libraries available for the workflow should be at /projects/bootcamp/workflow/lib. The process also defines properties queueName,, and fileTimestamp which are passed to the workflow. In addition, Falcon exposes the following properties to the workflow: nameNode, jobTracker(hadoop properties), input and output(Input/Output properties).
<?xml version="1.0" encoding="UTF-8"?>
Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday
<process name="SampleProcess">
<cluster name="corp" />
<validity start="2012-04-03T06:00Z" end="2022-12-30T00:00Z" timezone="UTC" />
<input name="input" feed="SampleInput" start="yesterday(0,0)" end="today(-1,0)" />
<output name="output" feed="SampleOutput" instance="yesterday(0,0)" />
<property name="queueName" value="reports" />
<property name="" value="" />
<property name="fileTimestamp" value="${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}" />
<workflow engine="oozie" path="/projects/bootcamp/workflow" />
<retry policy="periodic" delay="minutes(5)" attempts="3" />
<late-process policy="exp-backoff" delay="hours(1)">
<late-input input="input" workflow-path="/projects/bootcamp/workflow/lateinput" />
---++++ Oozie Workflow
The sample user workflow contains 3 actions:
* Pig action - Executes pig script /projects/bootcamp/workflow/script.pig
* concatenator - Java action that concatenates part files and generates a single file
* file upload - ssh action that gets the concatenated file from hadoop and sends the file to a remote host
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sample-wf">
<start to="pig" />
<action name="pig">
<delete path="${output}"/>
<ok to="concatenator" />
<error to="fail" />
<action name="concatenator">
<delete path="${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv"/>
<ok to="fileupload" />
<error to="fail"/>
<action name="fileupload">
<ok to="fileUploadDecision" />
<error to="fail"/>
<decision name="fileUploadDecision">
<case to="end">
${wf:actionData('fileupload')['output'] == '0'}
<default to="fail"/>
<kill name="fail">
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
<end name="end" />
---++++ File Upload Script
The script gets the file from hadoop, rsyncs the file to /tmp on remote host and deletes the file from hadoop
trap 'echo "output=$?"; exit $?' ERR INT TERM
echo "Arguments: $@"
rm -f /tmp/$FILENAME
hadoop fs -copyToLocal $SRCFILE /tmp/
echo "Copied $SRCFILE to /tmp"
rsync -ztv --rsh=ssh --stats /tmp/$FILENAME $DESTHOST:/tmp
hadoop fs -rmr $SRCFILE
echo "Deleted $SRCFILE"
rm -f /tmp/$FILENAME
echo "output=0"