layout: global displayTitle: Spark Configuration title: Configuration license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

  • This will become a table of contents (this text will be scraped). {:toc}

Spark provides three locations to configure the system:

  • Spark properties control most application parameters and can be set by using a SparkConf object, or through Java system properties.
  • Environment variables can be used to set per-machine settings, such as the IP address, through the conf/spark-env.sh script on each node.
  • Logging can be configured through log4j2.properties.

Spark Properties

Spark properties control most application settings and are configured separately for each application. These properties can be set directly on a SparkConf passed to your SparkContext. SparkConf allows you to configure some of the common properties (e.g. master URL and application name), as well as arbitrary key-value pairs through the set() method. For example, we could initialize an application with two threads as follows:

Note that we run with local[2], meaning two threads - which represents “minimal” parallelism, which can help detect bugs that only exist when we run in a distributed context.

{% highlight scala %} val conf = new SparkConf() .setMaster(“local[2]”) .setAppName(“CountingSheep”) val sc = new SparkContext(conf) {% endhighlight %}

Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may actually require more than 1 thread to prevent any sort of starvation issues.

Properties that specify some time duration should be configured with a unit of time. The following format is accepted:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

Properties that specify a byte size should be configured with a unit of size. The following format is accepted:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

While numbers without units are generally interpreted as bytes, a few are interpreted as KiB or MiB. See documentation of individual configuration properties. Specifying units is desirable where possible.

Dynamically Loading Spark Properties

In some cases, you may want to avoid hard-coding certain configurations in a SparkConf. For instance, if you'd like to run the same application with different masters or different amounts of memory. Spark allows you to simply create an empty conf:

{% highlight scala %} val sc = new SparkContext(new SparkConf()) {% endhighlight %}

Then, you can supply configuration values at runtime:

./bin/spark-submit \
  --name "My app" \
  --master local[4] \
  --conf spark.eventLog.enabled=false \
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
  myApp.jar

The Spark shell and spark-submit tool support two ways to load configurations dynamically. The first is command line options, such as --master, as shown above. spark-submit can accept any Spark property using the --conf/-c flag, but uses special flags for properties that play a part in launching the Spark application. Running ./bin/spark-submit --help will show the entire list of these options.

bin/spark-submit will also read configuration options from conf/spark-defaults.conf, in which each line consists of a key and a value separated by whitespace. For example:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

Any values specified as flags or in the properties file will be passed on to the application and merged with those specified through SparkConf. Properties set directly on the SparkConf take highest precedence, then flags passed to spark-submit or spark-shell, then options in the spark-defaults.conf file. A few configuration keys have been renamed since earlier versions of Spark; in such cases, the older key names are still accepted, but take lower precedence than any instance of the newer key.

Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

Viewing Spark Properties

The application web UI at http://<driver>:4040 lists Spark properties in the “Environment” tab. This is a useful place to check to make sure that your properties have been set correctly. Note that only values explicitly specified through spark-defaults.conf, SparkConf, or the command line will appear. For all other configuration properties, you can assume the default value is used.

Available Properties

Most of the properties that control internal settings have reasonable default values. Some of the most common options to set are:

Application Properties

<br/>
<em>Note:</em> This will be overridden by SPARK_LOCAL_DIRS (Standalone) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.

Apart from these, the following properties are also available, and may be useful in some situations:

Runtime Environment

<br /><em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-class-path</code> command line option or in
your default properties file.
For instance, GC settings or other logging.
Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap
size settings can be set with <code>spark.driver.memory</code> in the cluster mode and through
the <code>--driver-memory</code> command line option in the client mode.

<br /><em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-java-options</code> command line option or in
your default properties file.
For instance, GC settings or other logging.
Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap
size settings can be set with <code>spark.driver.memory</code> in the cluster mode and through
the <code>--driver-memory</code> command line option in the client mode.

<br /><em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-java-options</code> command line option or in
your default properties file.

<code>spark.driver.defaultJavaOptions</code> will be prepended to this configuration.
<br /><em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-library-path</code> command line option or in
your default properties file.
This is used in cluster mode only.
For instance, GC settings or other logging.
Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.

The following symbols, if present will be interpolated: {{APP_ID}} will be replaced by
application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For example, to enable
verbose gc logging to a file named for the executor ID of the app in /tmp, pass a 'value' of:
<code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
For instance, GC settings or other logging.
Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.

The following symbols, if present will be interpolated: {{APP_ID}} will be replaced by
application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For example, to enable
verbose gc logging to a file named for the executor ID of the app in /tmp, pass a 'value' of:
<code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>

<code>spark.executor.defaultJavaOptions</code> will be prepended to this configuration.
By default the <code>pyspark.profiler.BasicProfiler</code> will be used, but this can be overridden by
passing a profiler class in as a parameter to the <code>SparkContext</code> constructor.

Shuffle Behavior

Spark UI

<br />Filter parameters can also be specified in the configuration, by setting config entries
of the form <code>spark.&lt;class name of filter&gt;.param.&lt;param name&gt;=&lt;value&gt;</code>

<br />For example:
<br /><code>spark.ui.filters=com.test.filter1</code>
<br /><code>spark.com.test.filter1.param.name1=foo</code>
<br /><code>spark.com.test.filter1.param.name2=bar</code>

Compression and Serialization

Memory Management

Execution Behavior

Executor Metrics

Networking

<br />It also allows a different address from the local one to be advertised to executors or external systems.
This is useful, for example, when running containers with bridged networking. For this to properly work,
the different ports used by the driver (RPC, block manager and UI) need to be forwarded from the
container's host.

Scheduling

Barrier Execution Mode

Dynamic Allocation

Thread Configurations

Depending on jobs and cluster configurations, we can set number of threads in several places in Spark to utilize available resources efficiently to get better performance. Prior to Spark 3.0, these thread configurations apply to all roles of Spark, such as driver, executor, worker and master. From Spark 3.0, we can configure threads in finer granularity starting from driver and executor. Take RPC module as example in below table. For other modules, like shuffle, just replace “rpc” with “shuffle” in the property names except spark.{driver|executor}.rpc.netty.dispatcher.numThreads, which is only for RPC module.

The default value for number of thread-related config keys is the minimum of the number of cores requested for the driver or executor, or, in the absence of that value, the number of cores available for the JVM (with a hardcoded upper limit of 8).

Spark Connect

Server Configuration

Server configurations are set in Spark Connect server, for example, when you start the Spark Connect server with ./sbin/start-connect-server.sh. They are typically set via the config file and command-line options with --conf/-c.

Security

Please refer to the Security page for available options on how to secure different Spark subsystems.

Spark SQL

Runtime SQL Configuration

Runtime SQL configurations are per-session, mutable Spark SQL configurations. They can be set with initial values by the config file and command-line options with --conf/-c prefixed, or by setting SparkConf that are used to create SparkSession. Also, they can be set and queried by SET commands and rest to their initial values by RESET command, or by SparkSession.conf's setter and getter methods in runtime.

{% include_api_gen generated-runtime-sql-config-table.html %}

Static SQL Configuration

Static SQL configurations are cross-session, immutable Spark SQL configurations. They can be set with final values by the config file and command-line options with --conf/-c prefixed, or by setting SparkConf that are used to create SparkSession. External users can query the static sql config values via SparkSession.conf or via set command, e.g. SET spark.sql.extensions;, but cannot set/unset them.

{% include_api_gen generated-static-sql-config-table.html %}

Spark Streaming

SparkR

GraphX

Cluster Managers

Each cluster manager in Spark has additional configuration options. Configurations can be found on the pages for each mode:

YARN

Kubernetes

Standalone Mode

Environment Variables

Certain Spark settings can be configured through environment variables, which are read from the conf/spark-env.sh script in the directory where Spark is installed (or conf/spark-env.cmd on Windows). In Standalone mode, this file can give machine specific information such as hostnames. It is also sourced when running local Spark applications or submission scripts.

Note that conf/spark-env.sh does not exist by default when Spark is installed. However, you can copy conf/spark-env.sh.template to create it. Make sure you make the copy executable.

The following variables can be set in spark-env.sh:

In addition to the above, there are also options for setting up the Spark standalone cluster scripts, such as number of cores to use on each machine and maximum memory.

Since spark-env.sh is a shell script, some of these can be set programmatically -- for example, you might compute SPARK_LOCAL_IP by looking up the IP of a specific network interface.

Note: When running Spark on YARN in cluster mode, environment variables need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName] property in your conf/spark-defaults.conf file. Environment variables that are set in spark-env.sh will not be reflected in the YARN Application Master process in cluster mode. See the YARN-related Spark Properties for more information.

Configuring Logging

Spark uses log4j for logging. You can configure it by adding a log4j2.properties file in the conf directory. One way to start is to copy the existing templates log4j2.properties.template or log4j2.properties.pattern-layout-template located there.

Structured Logging

Starting from version 4.0.0, spark-submit has adopted the JSON Template Layout for logging, which outputs logs in JSON format. This format facilitates querying logs using Spark SQL with the JSON data source. Additionally, the logs include all Mapped Diagnostic Context (MDC) information for search and debugging purposes.

To configure the layout of structured logging, start with the log4j2.properties.template file.

To query Spark logs using Spark SQL, you can use the following Python code snippet:

from pyspark.util import LogUtils

logDf = spark.read.schema(LogUtils.LOG_SCHEMA).json("path/to/logs")

Or using the following Scala code snippet:

import org.apache.spark.util.LogUtils.LOG_SCHEMA

val logDf = spark.read.schema(LOG_SCHEMA).json("path/to/logs")

Plain Text Logging

If you prefer plain text logging, you can use the log4j2.properties.pattern-layout-template file as a starting point. This is the default configuration used by Spark before the 4.0.0 release. This configuration uses the PatternLayout to log all the logs in plain text. MDC information is not included by default. In order to print it in the logs, you can update the patternLayout in the file. For example, you can add %X{task_name} to print the task name in the logs. Moreover, you can use spark.sparkContext.setLocalProperty(s"mdc.$name", "value") to add user specific data into MDC. The key in MDC will be the string of mdc.$name.

Overriding configuration directory

To specify a different configuration directory other than the default “SPARK_HOME/conf”, you can set SPARK_CONF_DIR. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j2.properties, etc) from this directory.

Inheriting Hadoop Cluster Configuration

If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that should be included on Spark's classpath:

  • hdfs-site.xml, which provides default behaviors for the HDFS client.
  • core-site.xml, which sets the default filesystem name.

The location of these configuration files varies across Hadoop versions, but a common location is inside of /etc/hadoop/conf. Some tools create configurations on-the-fly, but offer a mechanism to download copies of them.

To make these files visible to Spark, set HADOOP_CONF_DIR in $SPARK_HOME/conf/spark-env.sh to a location containing the configuration files.

Custom Hadoop/Hive Configuration

If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive configuration files in Spark's classpath.

Multiple running applications might require different Hadoop/Hive client side configurations. You can copy and modify hdfs-site.xml, core-site.xml, yarn-site.xml, hive-site.xml in Spark's classpath for each application. In a Spark cluster running on YARN, these configuration files are set cluster-wide, and cannot safely be changed by the application.

The better choice is to use spark hadoop properties in the form of spark.hadoop.*, and use spark hive properties in the form of spark.hive.*. For example, adding configuration “spark.hadoop.abc.def=xyz” represents adding hadoop property “abc.def=xyz”, and adding configuration “spark.hive.abc=xyz” represents adding hive property “hive.abc=xyz”. They can be considered as same as normal spark properties which can be set in $SPARK_HOME/conf/spark-defaults.conf

In some cases, you may want to avoid hard-coding certain configurations in a SparkConf. For instance, Spark allows you to simply create an empty conf and set spark/spark hadoop/spark hive properties.

{% highlight scala %} val conf = new SparkConf().set(“spark.hadoop.abc.def”, “xyz”) val sc = new SparkContext(conf) {% endhighlight %}

Also, you can modify or add configurations at runtime: {% highlight bash %} ./bin/spark-submit
--name “My app”
--master local[4]
--conf spark.eventLog.enabled=false
--conf “spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps”
--conf spark.hadoop.abc.def=xyz
--conf spark.hive.abc=xyz myApp.jar {% endhighlight %}

Custom Resource Scheduling and Configuration Overview

GPUs and other accelerators have been widely used for accelerating special workloads, e.g., deep learning and signal processing. Spark now supports requesting and scheduling generic resources, such as GPUs, with a few caveats. The current implementation requires that the resource have addresses that can be allocated by the scheduler. It requires your cluster manager to support and be properly configured with the resources.

There are configurations available to request resources for the driver: spark.driver.resource.{resourceName}.amount, request resources for the executor(s): spark.executor.resource.{resourceName}.amount and specify the requirements for each task: spark.task.resource.{resourceName}.amount. The spark.driver.resource.{resourceName}.discoveryScript config is required on YARN, Kubernetes and a client side Driver on Spark Standalone. spark.executor.resource.{resourceName}.discoveryScript config is required for YARN and Kubernetes. Kubernetes also requires spark.driver.resource.{resourceName}.vendor and/or spark.executor.resource.{resourceName}.vendor. See the config descriptions above for more information on each.

Spark will use the configurations specified to first request containers with the corresponding resources from the cluster manager. Once it gets the container, Spark launches an Executor in that container which will discover what resources the container has and the addresses associated with each resource. The Executor will register with the Driver and report back the resources available to that Executor. The Spark scheduler can then schedule tasks to each Executor and assign specific resource addresses based on the resource requirements the user specified. The user can see the resources assigned to a task using the TaskContext.get().resources api. On the driver, the user can see the resources assigned with the SparkContext resources call. It's then up to the user to use the assigned addresses to do the processing they want or pass those into the ML/AI framework they are using.

See your cluster manager specific page for requirements and details on each of - YARN, Kubernetes and Standalone Mode. It is currently not available with local mode. And please also note that local-cluster mode with multiple workers is not supported(see Standalone documentation).

Stage Level Scheduling Overview

The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run. This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN, Kubernetes and Standalone cluster right now. See the YARN page or Kubernetes page or Standalone page for more implementation details.

See the RDD.withResources and ResourceProfileBuilder API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with DEFAULT_RESOURCE_PROFILE. While when dynamic allocation is enabled, the current implementation acquires new executors for each ResourceProfile created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config spark.scheduler.resource.profileMergeConflicts to control that behavior. The current merge strategy Spark implements when spark.scheduler.resource.profileMergeConflicts is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.

Push-based shuffle overview

Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.

External Shuffle service(server) side configuration options

Client side configuration options