blob: a8158691ddffe1dc6ac46fded363d704ff50b922 [file] [log] [blame]
<noautolink>
[[index][::Go back to Oozie Documentation Index::]]
-----
---+!! Oozie Spark Action Extension
%TOC%
---++ Spark Action
The =spark= action runs a Spark job.
The workflow job will wait until the Spark job completes before
continuing to the next action.
To run the Spark job, you have to configure the =spark= action with the
=job-tracker=, =name-node=, Spark =master= elements as
well as the necessary elements, arguments and configuration.
Spark options can be specified in an element called =spark-opts=
A =spark= action can be configured to create or delete HDFS directories
before starting the Spark job.
Oozie EL expressions can be used in the inline configuration. Property
values specified in the =configuration= element override values specified
in the =job-xml= file.
*Syntax:*
<verbatim>
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
...
<action name="[NODE-NAME]">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>[JOB-TRACKER]</job-tracker>
<name-node>[NAME-NODE]</name-node>
<prepare>
<delete path="[PATH]"/>
...
<mkdir path="[PATH]"/>
...
</prepare>
<job-xml>[SPARK SETTINGS FILE]</job-xml>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
<master>[SPARK MASTER URL]</master>
<mode>[SPARK MODE]</mode>
<name>[SPARK JOB NAME]</name>
<class>[SPARK MAIN CLASS]</class>
<jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
<spark-opts>[SPARK-OPTIONS]</spark-opts>
<arg>[ARG-VALUE]</arg>
...
<arg>[ARG-VALUE]</arg>
...
</spark>
<ok to="[NODE-NAME]"/>
<error to="[NODE-NAME]"/>
</action>
...
</workflow-app>
</verbatim>
The =prepare= element, if present, indicates a list of paths to delete
or create before starting the job. Specified paths must start with =hdfs://HOST:PORT=.
The =job-xml= element, if present, specifies a file containing configuration
for the Spark job. Multiple =job-xml= elements are allowed in order to
specify multiple =job.xml= files.
The =configuration= element, if present, contains configuration
properties that are passed to the Spark job.
The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master,
or local.
The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. This is typically
not required because you can specify it as part of =master= (i.e. master=yarn, mode=client is equivalent to master=yarn-client).
A local =master= always runs in client mode.
Depending on the =master= (and =mode=) entered, the Spark job will run differently as follows:
* local mode: everything runs here in the Launcher Job.
* yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn.
* yarn-cluster mode: the driver and executor run in Yarn.
The =name= element indicates the name of the spark application.
The =class= element if present, indicates the spark's application main class.
The =jar= element indicates a comma separated list of jars or python files.
The =spark-opts= element, if present, contains a list of spark options that can be passed to spark driver. Spark configuration
options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations=
in oozie-site.xml. Values containing whitespaces can be enclosed by double quotes. The =spark-opts= configs have priority.
Some examples of the =spark-opts= element:
* '--conf key=value'
* '--conf key1=value1 value2'
* '--conf key1="value1 value2"'
* '--conf key1=value1 key2="value2 value3"'
* '--conf key=value --verbose'
The =arg= element if present, contains arguments that can be passed to spark application.
All the above elements can be parameterized (templatized) using EL
expressions.
*Example:*
<verbatim>
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
...
<action name="myfirstsparkjob">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<master>local[*]</master>
<mode>client<mode>
<name>Spark Example</name>
<class>org.apache.spark.examples.mllib.JavaALS</class>
<jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
<spark-opts>--executor-memory 20G --num-executors 50
--conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts>
<arg>inputpath=hdfs://localhost/input/file.txt</arg>
<arg>value=2</arg>
</spark>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
</verbatim>
---+++ Spark Action Logging
Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOUT/STDERR that runs Spark.
From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible
to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console.
---+++ Spark on YARN
To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml.
1. spark.yarn.historyServer.address=SPH-HOST:18088
2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
3. spark.eventLog.enabled=true
---+++ PySpark with Spark Action
To submit PySpark scripts with Spark Action, pyspark dependencies must be available in sharelib or in workflow's lib/ directory.
For more information, please refer to [[AG_Install#Oozie_Share_Lib][installation document.]]
*Example:*
<verbatim>
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
....
<action name="myfirstpysparkjob">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>foo:8021</job-tracker>
<name-node>bar:8020</name-node>
<prepare>
<delete path="${jobOutput}"/>
</prepare>
<configuration>
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
</configuration>
<master>yarn-cluster</master>
<name>Spark Example</name>
<jar>pi.py</jar>
<spark-opts>--executor-memory 20G --num-executors 50
--conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts>
<arg>100</arg>
</spark>
<ok to="myotherjob"/>
<error to="errorcleanup"/>
</action>
...
</workflow-app>
</verbatim>
The =jar= element indicates python file. Refer to the file by it's localized name, because only local files are allowed
in PySpark. The py file should be in the lib/ folder next to the workflow.xml or added using the =file= element so that
it's localized to the working directory with just its name.
---+++ Using Symlink in <jar>
A symlink must be specified using =[[WorkflowFunctionalSpec#a3.2.2.
1_Adding_Files_and_Archives_for_the_Job][file]]= element. Then, you can use
the symlink name in =jar= element.
*Example:*
Specifying relative path for symlink:
Make sure that the file is within the application directory i.e. =oozie.wf.application.path= .
<verbatim>
<spark xmlns="uri:oozie:spark-action:0.2">
...
<jar>py-spark-example-symlink.py</jar>
...
...
<file>py-spark.py#py-spark-example-symlink.py</file>
...
</spark>
</verbatim>
Specifying full path for symlink:
<verbatim>
<spark xmlns="uri:oozie:spark-action:0.2">
...
<jar>spark-example-symlink.jar</jar>
...
...
<file>hdfs://localhost:8020/user/testjars/all-oozie-examples.jar#spark-example-symlink.jar</file>
...
</spark>
</verbatim>
---++ Appendix, Spark XML-Schema
---+++ AE.A Appendix A, Spark XML-Schema
---++++ Spark Action Schema Version 0.1
<verbatim>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified"
targetNamespace="uri:oozie:spark-action:0.1">
<xs:element name="spark" type="spark:ACTION"/>
<xs:complexType name="ACTION">
<xs:sequence>
<xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
<xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
<xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="CONFIGURATION">
<xs:sequence>
<xs:element name="property" minOccurs="1" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
<xs:complexType name="PREPARE">
<xs:sequence>
<xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="DELETE">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>
<xs:complexType name="MKDIR">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>
</xs:schema>
</verbatim>
---++++ Spark Action Schema Version 0.2
<verbatim>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:spark="uri:oozie:spark-action:0.2" elementFormDefault="qualified"
targetNamespace="uri:oozie:spark-action:0.2">
<xs:element name="spark" type="spark:ACTION"/>
<xs:complexType name="ACTION">
<xs:sequence>
<xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
<xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
<xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="CONFIGURATION">
<xs:sequence>
<xs:element name="property" minOccurs="1" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
<xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
<xs:complexType name="PREPARE">
<xs:sequence>
<xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="DELETE">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>
<xs:complexType name="MKDIR">
<xs:attribute name="path" type="xs:string" use="required"/>
</xs:complexType>
</xs:schema>
</verbatim>
[[index][::Go back to Oozie Documentation Index::]]
</noautolink>