layout: global title: Running Spark on YARN license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Support for running on YARN (Hadoop NextGen) was added to Spark in version 0.6.0, and improved in subsequent releases.
Security in Spark is OFF by default. This could mean you are vulnerable to attack by default. Please see Spark Security and the specific security sections in this doc before running Spark.
Ensure that HADOOP_CONF_DIR
or YARN_CONF_DIR
points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration. If the configuration references Java system properties or environment variables not managed by YARN, they should also be set in the Spark application's configuration (driver, executors, and the AM when running in client mode).
There are two deploy modes that can be used to launch Spark applications on YARN. In cluster
mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client
mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
Unlike other cluster managers supported by Spark in which the master‘s address is specified in the --master
parameter, in YARN mode the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the --master
parameter is yarn
.
To launch a Spark application in cluster
mode:
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
For example:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ examples/jars/spark-examples*.jar \ 10
The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the Debugging your Application section below for how to see driver and executor logs.
To launch a Spark application in client
mode, do the same, but replace cluster
with client
. The following shows how you can run spark-shell
in client
mode:
$ ./bin/spark-shell --master yarn --deploy-mode client
In cluster
mode, the driver runs on a different machine than the client, so SparkContext.addJar
won't work out of the box with files that are local to the client. To make files on the client available to SparkContext.addJar
, include them with the --jars
option in the launch command.
$ ./bin/spark-submit --class my.main.Class \ --master yarn \ --deploy-mode cluster \ --jars my-other-jar.jar,my-other-other-jar.jar \ my-main-jar.jar \ app_arg1 app_arg2
Running Spark on YARN requires a binary distribution of Spark which is built with YARN support. Binary distributions can be downloaded from the downloads page of the project website. There are two variants of Spark binary distributions you can download. One is pre-built with a certain version of Apache Hadoop; this Spark distribution contains built-in Hadoop runtime, so we call it with-hadoop
Spark distribution. The other one is pre-built with user-provided Hadoop; since this Spark distribution doesn‘t contain a built-in Hadoop runtime, it’s smaller, but users have to provide a Hadoop installation separately. We call this variant no-hadoop
Spark distribution. For with-hadoop
Spark distribution, since it contains a built-in Hadoop runtime already, by default, when a job is submitted to Hadoop Yarn cluster, to prevent jar conflict, it will not populate Yarn‘s classpath into Spark. To override this behavior, you can set spark.yarn.populateHadoopClasspath=true. For no-hadoop
Spark distribution, Spark will populate Yarn’s classpath by default in order to get Hadoop runtime. For with-hadoop
Spark distribution, if your application depends on certain library that is only available in the cluster, you can try to populate the Yarn classpath by setting the property mentioned above. If you run into jar conflict issue by doing so, you will need to turn it off and include this library in your application jar.
To build Spark yourself, refer to Building Spark.
To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive
or spark.yarn.jars
. For details please refer to Spark Properties. If neither spark.yarn.archive
nor spark.yarn.jars
is specified, Spark will create a zip file with all jars under $SPARK_HOME/jars
and upload it to the distributed cache.
Most of the configs are the same for Spark on YARN as for other deployment modes. See the configuration page for more information on those. These are configs that are specific to Spark on YARN.
In YARN terminology, executors and application masters run inside “containers”. YARN has two modes for handling container logs after an application has completed. If log aggregation is turned on (with the yarn.log-aggregation-enable
config), container logs are copied to HDFS and deleted on the local machine. These logs can be viewed from anywhere on the cluster with the yarn logs
command.
yarn logs -applicationId <app ID>
will print out the contents of all log files from all containers from the given application. You can also view the container log files directly in HDFS using the HDFS shell or API. The directory where they are located can be found by looking at your YARN configs (yarn.nodemanager.remote-app-log-dir
and yarn.nodemanager.remote-app-log-dir-suffix
). The logs are also available on the Spark Web UI under the Executors Tab. You need to have both the Spark history server and the MapReduce history server running and configure yarn.log.server.url
in yarn-site.xml
properly. The log URL on the Spark history server UI will redirect you to the MapReduce history server to show the aggregated logs.
When log aggregation isn‘t turned on, logs are retained locally on each machine under YARN_APP_LOGS_DIR
, which is usually configured to /tmp/logs
or $HADOOP_HOME/logs/userlogs
depending on the Hadoop version and installation. Viewing logs for a container requires going to the host that contains them and looking in this directory. Subdirectories organize log files by application ID and container ID. The logs are also available on the Spark Web UI under the Executors Tab and doesn’t require running the MapReduce history server.
To review per-container launch environment, increase yarn.nodemanager.delete.debug-delay-sec
to a large value (e.g. 36000
), and then access the application cache through yarn.nodemanager.local-dirs
on the nodes on which containers are launched. This directory contains the launch script, JARs, and all environment variables used for launching each container. This process is useful for debugging classpath problems in particular. (Note that enabling this requires admin privileges on cluster settings and a restart of all node managers. Thus, this is not applicable to hosted clusters).
To use a custom log4j configuration for the application master or executors, here are the options:
log4j.properties
using spark-submit
, by adding it to the --files
list of files to be uploaded with the application.-Dlog4j.configuration=<location of configuration file>
to spark.driver.extraJavaOptions
(for the driver) or spark.executor.extraJavaOptions
(for executors). Note that if using a file, the file:
protocol should be explicitly provided, and the file needs to exist locally on all the nodes.$SPARK_CONF_DIR/log4j.properties
file and it will be automatically uploaded along with the other configurations. Note that other 2 options has higher priority than this option if multiple options are specified.Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).
If you need a reference to the proper location to put log files in the YARN so that YARN can properly display and aggregate them, use spark.yarn.app.container.log.dir
in your log4j.properties
. For example, log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log
. For streaming applications, configuring RollingFileAppender
and setting file location to YARN‘s log directory will avoid disk overflow caused by large log files, and logs can be accessed using YARN’s log utility.
To use a custom metrics.properties for the application master and executors, update the $SPARK_CONF_DIR/metrics.properties
file. It will automatically be uploaded with other configurations, so you don't need to specify it manually with --files
.
For example, suppose you would like to point log url link to Job History Server directly instead of let NodeManager http server redirects it, you can configure spark.history.custom.executor.log.url
as below:
{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096
NOTE: you need to replace <JHS_POST>
and <JHS_PORT>
with actual value.
Please make sure to have read the Custom Resource Scheduling and Configuration Overview section on the configuration page. This section only talks about the YARN specific aspects of resource scheduling.
YARN needs to be configured to support any resources the user wants to use with Spark. Resource scheduling on YARN was added in YARN 3.1.0. See the YARN documentation for more information on configuring resources and properly setting up isolation. Ideally the resources are setup isolated so that an executor can only see the resources it was allocated. If you do not have isolation enabled, the user is responsible for creating a discovery script that ensures the resource is not shared between executors.
YARN currently supports any user defined resource type but has built in types for GPU (yarn.io/gpu) and FPGA (yarn.io/fpga). For that reason, if you are using either of those resources, Spark can translate your request for spark resources into YARN resources and you only have to specify the spark.{driver/executor}.resource. configs. If you are using a resource other then FPGA or GPU, the user is responsible for specifying the configs for both YARN (spark.yarn.{driver/executor}.resource.) and Spark (spark.{driver/executor}.resource.).
For example, the user wants to request 2 GPUs for each executor. The user can just specify spark.executor.resource.gpu.amount=2 and Spark will handle requesting yarn.io/gpu resource type from YARN.
If the user has a user defined YARN resource, lets call it acceleratorX
then the user must specify spark.yarn.executor.resource.acceleratorX.amount=2 and spark.executor.resource.acceleratorX.amount=2.
YARN does not tell Spark the addresses of the resources allocated to each container. For that reason, the user must specify a discovery script that gets run by the executor on startup to discover what resources are available to that executor. You can find an example scripts in examples/src/main/scripts/getGpusResources.sh
. The script must have execute permissions set and the user should setup permissions to not allow malicious users to modify it. The script should write to STDOUT a JSON string in the format of the ResourceInformation class. This has the resource name and an array of resource addresses available to just that executor.
Stage level scheduling is supported on YARN when dynamic allocation is enabled. One thing to note that is YARN specific is that each ResourceProfile requires a different container priority on YARN. The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. This means that profiles created earlier will have a higher priority in YARN. Normally this won‘t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. Note there is a difference in the way custom resources are handled between the base default profile and custom ResourceProfiles. To allow for the user to request YARN containers with extra resources without Spark scheduling on them, the user can specify resources via the spark.yarn.executor.resource. config. Those configs are only used in the base default profile though and do not get propagated into any other custom ResourceProfiles. This is because there would be no way to remove them if you wanted a stage to not have them. This results in your default profile getting custom resources defined in spark.yarn.executor.resource. plus spark defined resources of GPU or FPGA. Spark converts GPU and FPGA resources into the YARN built in types yarn.io/gpu) and yarn.io/fpga, but does not know the mapping of any other resources. Any other Spark custom resources are not propagated to YARN for the default profile. So if you want Spark to schedule based off a custom resource and have it requested from YARN, you must specify it in both YARN (spark.yarn.{driver/executor}.resource.) and Spark (spark.{driver/executor}.resource.) configs. Leave the Spark config off if you only want YARN containers with the extra resources but Spark not to schedule using them. Now for custom ResourceProfiles, it doesn’t currently have a way to only specify YARN resources without Spark scheduling off of them. This means for custom ResourceProfiles we propagate all the resources defined in the ResourceProfile to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires that the name of any custom resources you specify match what they are defined as in YARN.
cluster
mode, the local directories used by the Spark executors and the Spark driver will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs
). If the user specifies spark.local.dir
, it will be ignored. In client
mode, the Spark executors will use the local directories configured for YARN while the Spark driver will use those defined in spark.local.dir
. This is because the Spark driver does not run on the YARN cluster in client
mode, only the Spark executors do.--files
and --archives
options support specifying file names with the # similar to Hadoop. For example, you can specify: --files localtest.txt#appSees.txt
and this will upload the file you have locally named localtest.txt
into HDFS but this will be linked to by the name appSees.txt
, and your application should use the name as appSees.txt
to reference it when running on YARN.--jars
option allows the SparkContext.addJar
function to work if you are using it with local files and running in cluster
mode. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.Standard Kerberos support in Spark is covered in the Security page.
In YARN mode, when accessing Hadoop file systems, aside from the default file system in the hadoop configuration, Spark will also automatically obtain delegation tokens for the service hosting the staging directory of the Spark application.
(Works also with the “local” master.)
(Works also with the “local” master.)
Debugging Hadoop/Kerberos problems can be “difficult”. One useful technique is to enable extra logging of Kerberos operations in Hadoop by setting the HADOOP_JAAS_DEBUG
environment variable.
export HADOOP_JAAS_DEBUG=true
The JDK classes can be configured to enable extra logging of their Kerberos and SPNEGO/REST authentication via the system properties sun.security.krb5.debug
and sun.security.spnego.debug=true
-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
All these options can be enabled in the Application Master:
spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
Finally, if the log level for org.apache.spark.deploy.yarn.Client
is set to DEBUG
, the log will include a list of all tokens obtained, and their expiry details
To start the Spark Shuffle Service on each NodeManager
in your YARN cluster, follow these instructions:
spark-<version>-yarn-shuffle.jar
. This should be under $SPARK_HOME/common/network-yarn/target/scala-<version>
if you are building Spark yourself, and under yarn
if you are using a distribution.NodeManager
s in your cluster.yarn-site.xml
on each node, add spark_shuffle
to yarn.nodemanager.aux-services
, then set yarn.nodemanager.aux-services.spark_shuffle.class
to org.apache.spark.network.yarn.YarnShuffleService
.NodeManager's
heap size by setting YARN_HEAPSIZE
(1000 by default) in etc/hadoop/yarn-env.sh
to avoid garbage collection issues during shuffle.NodeManager
s in your cluster.The following extra configuration options are available when the shuffle service is running on YARN:
Apache Oozie can launch Spark applications as part of a workflow. In a secure cluster, the launched application will need the relevant tokens to access the cluster's services. If Spark is launched with a keytab, this is automatic. However, if Spark is to be launched without a keytab, the responsibility for setting up security must be handed over to Oozie.
The details of configuring Oozie for secure clusters and obtaining credentials for a job can be found on the Oozie web site in the “Authentication” section of the specific release's documentation.
For Spark applications, the Oozie workflow must be set up for Oozie to request all tokens which the application needs, including:
To avoid Spark attempting —and then failing— to obtain Hive, HBase and remote HDFS tokens, the Spark configuration must be set to disable token collection for the services.
The Spark configuration must include the lines:
spark.security.credentials.hive.enabled false spark.security.credentials.hbase.enabled false
The configuration option spark.kerberos.access.hadoopFileSystems
must be unset.
It is possible to use the Spark History Server application page as the tracking URL for running applications when the application UI is disabled. This may be desirable on secure clusters, or to reduce the memory usage of the Spark driver. To set up tracking through the Spark History Server, do the following:
Be aware that the history server information may not be up-to-date with the application's state.