| <noautolink> |
| |
| [[index][::Go back to Oozie Documentation Index::]] |
| |
| ----- |
| |
| ---+!! Oozie Spark Action Extension |
| |
| %TOC% |
| |
| ---++ Spark Action |
| |
| The =spark= action runs a Spark job. |
| |
| The workflow job will wait until the Spark job completes before |
| continuing to the next action. |
| |
| To run the Spark job, you have to configure the =spark= action with the |
| =job-tracker=, =name-node=, Spark =master= elements as |
| well as the necessary elements, arguments and configuration. |
| |
| Spark options can be specified in an element called =spark-opts= |
| |
| A =spark= action can be configured to create or delete HDFS directories |
| before starting the Spark job. |
| |
| Oozie EL expressions can be used in the inline configuration. Property |
| values specified in the =configuration= element override values specified |
| in the =job-xml= file. |
| |
| *Syntax:* |
| |
| <verbatim> |
| <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3"> |
| ... |
| <action name="[NODE-NAME]"> |
| <spark xmlns="uri:oozie:spark-action:0.1"> |
| <job-tracker>[JOB-TRACKER]</job-tracker> |
| <name-node>[NAME-NODE]</name-node> |
| <prepare> |
| <delete path="[PATH]"/> |
| ... |
| <mkdir path="[PATH]"/> |
| ... |
| </prepare> |
| <job-xml>[SPARK SETTINGS FILE]</job-xml> |
| <configuration> |
| <property> |
| <name>[PROPERTY-NAME]</name> |
| <value>[PROPERTY-VALUE]</value> |
| </property> |
| ... |
| </configuration> |
| <master>[SPARK MASTER URL]</master> |
| <mode>[SPARK MODE]</mode> |
| <name>[SPARK JOB NAME]</name> |
| <class>[SPARK MAIN CLASS]</class> |
| <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar> |
| <spark-opts>[SPARK-OPTIONS]</spark-opts> |
| <arg>[ARG-VALUE]</arg> |
| ... |
| <arg>[ARG-VALUE]</arg> |
| ... |
| </spark> |
| <ok to="[NODE-NAME]"/> |
| <error to="[NODE-NAME]"/> |
| </action> |
| ... |
| </workflow-app> |
| </verbatim> |
| |
| The =prepare= element, if present, indicates a list of paths to delete |
| or create before starting the job. Specified paths must start with =hdfs://HOST:PORT=. |
| |
| The =job-xml= element, if present, specifies a file containing configuration |
| for the Spark job. Multiple =job-xml= elements are allowed in order to |
| specify multiple =job.xml= files. |
| |
| The =configuration= element, if present, contains configuration |
| properties that are passed to the Spark job. |
| |
| The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, |
| or local. |
| |
| The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. This is typically |
| not required because you can specify it as part of =master= (i.e. master=yarn, mode=client is equivalent to master=yarn-client). |
| A local =master= always runs in client mode. |
| |
| Depending on the =master= (and =mode=) entered, the Spark job will run differently as follows: |
| * local mode: everything runs here in the Launcher Job. |
| * yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn. |
| * yarn-cluster mode: the driver and executor run in Yarn. |
| |
| The =name= element indicates the name of the spark application. |
| |
| The =class= element if present, indicates the spark's application main class. |
| |
| The =jar= element indicates a comma separated list of jars or python files. |
| |
| The =spark-opts= element, if present, contains a list of spark options that can be passed to spark driver. Spark configuration |
| options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations= |
| in oozie-site.xml. Values containing whitespaces can be enclosed by double quotes. The =spark-opts= configs have priority. |
| |
| Some examples of the =spark-opts= element: |
| * '--conf key=value' |
| * '--conf key1=value1 value2' |
| * '--conf key1="value1 value2"' |
| * '--conf key1=value1 key2="value2 value3"' |
| * '--conf key=value --verbose' |
| |
| The =arg= element if present, contains arguments that can be passed to spark application. |
| |
| All the above elements can be parameterized (templatized) using EL |
| expressions. |
| |
| *Example:* |
| |
| <verbatim> |
| <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> |
| ... |
| <action name="myfirstsparkjob"> |
| <spark xmlns="uri:oozie:spark-action:0.1"> |
| <job-tracker>foo:8021</job-tracker> |
| <name-node>bar:8020</name-node> |
| <prepare> |
| <delete path="${jobOutput}"/> |
| </prepare> |
| <configuration> |
| <property> |
| <name>mapred.compress.map.output</name> |
| <value>true</value> |
| </property> |
| </configuration> |
| <master>local[*]</master> |
| <mode>client<mode> |
| <name>Spark Example</name> |
| <class>org.apache.spark.examples.mllib.JavaALS</class> |
| <jar>/lib/spark-examples_2.10-1.1.0.jar</jar> |
| <spark-opts>--executor-memory 20G --num-executors 50 |
| --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts> |
| <arg>inputpath=hdfs://localhost/input/file.txt</arg> |
| <arg>value=2</arg> |
| </spark> |
| <ok to="myotherjob"/> |
| <error to="errorcleanup"/> |
| </action> |
| ... |
| </workflow-app> |
| </verbatim> |
| |
| ---+++ Spark Action Logging |
| |
| Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOUT/STDERR that runs Spark. |
| |
| From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible |
| to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console. |
| |
| ---+++ Spark on YARN |
| |
| To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties |
| either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml. |
| |
| 1. spark.yarn.historyServer.address=SPH-HOST:18088 |
| |
| 2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory |
| |
| 3. spark.eventLog.enabled=true |
| |
| ---+++ PySpark with Spark Action |
| |
| To submit PySpark scripts with Spark Action, pyspark dependencies must be available in sharelib or in workflow's lib/ directory. |
| For more information, please refer to [[AG_Install#Oozie_Share_Lib][installation document.]] |
| |
| *Example:* |
| |
| <verbatim> |
| <workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> |
| .... |
| <action name="myfirstpysparkjob"> |
| <spark xmlns="uri:oozie:spark-action:0.1"> |
| <job-tracker>foo:8021</job-tracker> |
| <name-node>bar:8020</name-node> |
| <prepare> |
| <delete path="${jobOutput}"/> |
| </prepare> |
| <configuration> |
| <property> |
| <name>mapred.compress.map.output</name> |
| <value>true</value> |
| </property> |
| </configuration> |
| <master>yarn-cluster</master> |
| <name>Spark Example</name> |
| <jar>pi.py</jar> |
| <spark-opts>--executor-memory 20G --num-executors 50 |
| --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts> |
| <arg>100</arg> |
| </spark> |
| <ok to="myotherjob"/> |
| <error to="errorcleanup"/> |
| </action> |
| ... |
| </workflow-app> |
| </verbatim> |
| |
| The =jar= element indicates python file. Refer to the file by it's localized name, because only local files are allowed |
| in PySpark. The py file should be in the lib/ folder next to the workflow.xml or added using the =file= element so that |
| it's localized to the working directory with just its name. |
| |
| ---+++ Using Symlink in <jar> |
| |
| A symlink must be specified using =[[WorkflowFunctionalSpec#a3.2.2. |
| 1_Adding_Files_and_Archives_for_the_Job][file]]= element. Then, you can use |
| the symlink name in =jar= element. |
| |
| *Example:* |
| |
| Specifying relative path for symlink: |
| |
| Make sure that the file is within the application directory i.e. =oozie.wf.application.path= . |
| <verbatim> |
| <spark xmlns="uri:oozie:spark-action:0.2"> |
| ... |
| <jar>py-spark-example-symlink.py</jar> |
| ... |
| ... |
| <file>py-spark.py#py-spark-example-symlink.py</file> |
| ... |
| </spark> |
| </verbatim> |
| |
| Specifying full path for symlink: |
| <verbatim> |
| <spark xmlns="uri:oozie:spark-action:0.2"> |
| ... |
| <jar>spark-example-symlink.jar</jar> |
| ... |
| ... |
| <file>hdfs://localhost:8020/user/testjars/all-oozie-examples.jar#spark-example-symlink.jar</file> |
| ... |
| </spark> |
| </verbatim> |
| |
| |
| |
| ---++ Appendix, Spark XML-Schema |
| |
| ---+++ AE.A Appendix A, Spark XML-Schema |
| |
| ---++++ Spark Action Schema Version 0.1 |
| <verbatim> |
| <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
| xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified" |
| targetNamespace="uri:oozie:spark-action:0.1"> |
| |
| <xs:element name="spark" type="spark:ACTION"/> |
| |
| <xs:complexType name="ACTION"> |
| <xs:sequence> |
| <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> |
| <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> |
| </xs:sequence> |
| </xs:complexType> |
| |
| <xs:complexType name="CONFIGURATION"> |
| <xs:sequence> |
| <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> |
| <xs:complexType> |
| <xs:sequence> |
| <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> |
| <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> |
| <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> |
| </xs:sequence> |
| </xs:complexType> |
| </xs:element> |
| </xs:sequence> |
| </xs:complexType> |
| |
| <xs:complexType name="PREPARE"> |
| <xs:sequence> |
| <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/> |
| <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/> |
| </xs:sequence> |
| </xs:complexType> |
| |
| <xs:complexType name="DELETE"> |
| <xs:attribute name="path" type="xs:string" use="required"/> |
| </xs:complexType> |
| |
| <xs:complexType name="MKDIR"> |
| <xs:attribute name="path" type="xs:string" use="required"/> |
| </xs:complexType> |
| |
| </xs:schema> |
| </verbatim> |
| |
| ---++++ Spark Action Schema Version 0.2 |
| <verbatim> |
| <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" |
| xmlns:spark="uri:oozie:spark-action:0.2" elementFormDefault="qualified" |
| targetNamespace="uri:oozie:spark-action:0.2"> |
| |
| <xs:element name="spark" type="spark:ACTION"/> |
| |
| <xs:complexType name="ACTION"> |
| <xs:sequence> |
| <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> |
| <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/> |
| <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/> |
| <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> |
| <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> |
| <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> |
| </xs:sequence> |
| </xs:complexType> |
| |
| <xs:complexType name="CONFIGURATION"> |
| <xs:sequence> |
| <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> |
| <xs:complexType> |
| <xs:sequence> |
| <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> |
| <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> |
| <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> |
| </xs:sequence> |
| </xs:complexType> |
| </xs:element> |
| </xs:sequence> |
| </xs:complexType> |
| |
| <xs:complexType name="PREPARE"> |
| <xs:sequence> |
| <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/> |
| <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/> |
| </xs:sequence> |
| </xs:complexType> |
| |
| <xs:complexType name="DELETE"> |
| <xs:attribute name="path" type="xs:string" use="required"/> |
| </xs:complexType> |
| |
| <xs:complexType name="MKDIR"> |
| <xs:attribute name="path" type="xs:string" use="required"/> |
| </xs:complexType> |
| |
| </xs:schema> |
| </verbatim> |
| [[index][::Go back to Oozie Documentation Index::]] |
| |
| </noautolink> |
| |
| |
| |