| |
| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>Job Scheduling - Spark 3.5.0 Documentation</title> |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> |
| <link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet"> |
| <link href="css/custom.css" rel="stylesheet"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <link rel="stylesheet" href="css/docsearch.min.css" /> |
| <link rel="stylesheet" href="css/docsearch.css"> |
| |
| <!-- Matomo --> |
| <script type="text/javascript"> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| _paq.push(["disableCookies"]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '40']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| </head> |
| <body class="global"> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar"> |
| <div class="navbar-brand"><a href="index.html"> |
| <img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.0</span> |
| </div> |
| <button class="navbar-toggler" type="button" data-toggle="collapse" |
| data-target="#navbarCollapse" aria-controls="navbarCollapse" |
| aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarCollapse"> |
| <ul class="navbar-nav me-auto"> |
| <li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a> |
| <div class="dropdown-menu" aria-labelledby="navbarQuickStart"> |
| <a class="dropdown-item" href="quick-start.html">Quick Start</a> |
| <a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a> |
| <a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a> |
| <a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a> |
| <a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a> |
| <a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a> |
| <a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a> |
| <a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a> |
| <a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a> |
| <div class="dropdown-menu" aria-labelledby="navbarAPIDocs"> |
| <a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a> |
| <a class="dropdown-item" href="api/java/index.html">Java</a> |
| <a class="dropdown-item" href="api/python/index.html">Python</a> |
| <a class="dropdown-item" href="api/R/index.html">R</a> |
| <a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a> |
| <div class="dropdown-menu" aria-labelledby="navbarDeploying"> |
| <a class="dropdown-item" href="cluster-overview.html">Overview</a> |
| <a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a> |
| <a class="dropdown-item" href="running-on-mesos.html">Mesos</a> |
| <a class="dropdown-item" href="running-on-yarn.html">YARN</a> |
| <a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a> |
| <div class="dropdown-menu" aria-labelledby="navbarMore"> |
| <a class="dropdown-item" href="configuration.html">Configuration</a> |
| <a class="dropdown-item" href="monitoring.html">Monitoring</a> |
| <a class="dropdown-item" href="tuning.html">Tuning Guide</a> |
| <a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a> |
| <a class="dropdown-item" href="security.html">Security</a> |
| <a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a> |
| <a class="dropdown-item" href="migration-guide.html">Migration Guide</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="building-spark.html">Building Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a> |
| </div> |
| </li> |
| |
| <li class="nav-item"> |
| <input type="text" id="docsearch-input" placeholder="Search the docs…"> |
| </li> |
| </ul> |
| <!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.0</span></span>--> |
| </div> |
| </nav> |
| |
| |
| |
| <div class="container"> |
| |
| |
| <div class="content mr-3" id="content"> |
| |
| |
| <h1 class="title">Job Scheduling</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#overview" id="markdown-toc-overview">Overview</a></li> |
| <li><a href="#scheduling-across-applications" id="markdown-toc-scheduling-across-applications">Scheduling Across Applications</a> <ul> |
| <li><a href="#dynamic-resource-allocation" id="markdown-toc-dynamic-resource-allocation">Dynamic Resource Allocation</a> <ul> |
| <li><a href="#caveats" id="markdown-toc-caveats">Caveats</a></li> |
| <li><a href="#configuration-and-setup" id="markdown-toc-configuration-and-setup">Configuration and Setup</a></li> |
| <li><a href="#resource-allocation-policy" id="markdown-toc-resource-allocation-policy">Resource Allocation Policy</a> <ul> |
| <li><a href="#request-policy" id="markdown-toc-request-policy">Request Policy</a></li> |
| <li><a href="#remove-policy" id="markdown-toc-remove-policy">Remove Policy</a></li> |
| </ul> |
| </li> |
| <li><a href="#graceful-decommission-of-executors" id="markdown-toc-graceful-decommission-of-executors">Graceful Decommission of Executors</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#scheduling-within-an-application" id="markdown-toc-scheduling-within-an-application">Scheduling Within an Application</a> <ul> |
| <li><a href="#fair-scheduler-pools" id="markdown-toc-fair-scheduler-pools">Fair Scheduler Pools</a></li> |
| <li><a href="#default-behavior-of-pools" id="markdown-toc-default-behavior-of-pools">Default Behavior of Pools</a></li> |
| <li><a href="#configuring-pool-properties" id="markdown-toc-configuring-pool-properties">Configuring Pool Properties</a></li> |
| <li><a href="#scheduling-using-jdbc-connections" id="markdown-toc-scheduling-using-jdbc-connections">Scheduling using JDBC Connections</a></li> |
| <li><a href="#concurrent-jobs-in-pyspark" id="markdown-toc-concurrent-jobs-in-pyspark">Concurrent Jobs in PySpark</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h1 id="overview">Overview</h1> |
| |
| <p>Spark has several facilities for scheduling resources between computations. First, recall that, as described |
| in the <a href="cluster-overview.html">cluster mode overview</a>, each Spark application (instance of SparkContext) |
| runs an independent set of executor processes. The cluster managers that Spark runs on provide |
| facilities for <a href="#scheduling-across-applications">scheduling across applications</a>. Second, |
| <em>within</em> each Spark application, multiple “jobs” (Spark actions) may be running concurrently |
| if they were submitted by different threads. This is common if your application is serving requests |
| over the network. Spark includes a <a href="#scheduling-within-an-application">fair scheduler</a> to schedule resources within each SparkContext.</p> |
| |
| <h1 id="scheduling-across-applications">Scheduling Across Applications</h1> |
| |
| <p>When running on a cluster, each Spark application gets an independent set of executor JVMs that only |
| run tasks and store data for that application. If multiple users need to share your cluster, there are |
| different options to manage allocation, depending on the cluster manager.</p> |
| |
| <p>The simplest option, available on all cluster managers, is <em>static partitioning</em> of resources. With |
| this approach, each application is given a maximum amount of resources it can use and holds onto them |
| for its whole duration. This is the approach used in Spark’s <a href="spark-standalone.html">standalone</a> |
| and <a href="running-on-yarn.html">YARN</a> modes, as well as the |
| <a href="running-on-mesos.html#mesos-run-modes">coarse-grained Mesos mode</a>. |
| Resource allocation can be configured as follows, based on the cluster type:</p> |
| |
| <ul> |
| <li><strong>Standalone mode:</strong> By default, applications submitted to the standalone mode cluster will run in |
| FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit |
| the number of nodes an application uses by setting the <code class="language-plaintext highlighter-rouge">spark.cores.max</code> configuration property in it, |
| or change the default for applications that don’t set this setting through <code class="language-plaintext highlighter-rouge">spark.deploy.defaultCores</code>. |
| Finally, in addition to controlling cores, each application’s <code class="language-plaintext highlighter-rouge">spark.executor.memory</code> setting controls |
| its memory use.</li> |
| <li><strong>Mesos:</strong> To use static partitioning on Mesos, set the <code class="language-plaintext highlighter-rouge">spark.mesos.coarse</code> configuration property to <code class="language-plaintext highlighter-rouge">true</code>, |
| and optionally set <code class="language-plaintext highlighter-rouge">spark.cores.max</code> to limit each application’s resource share as in the standalone mode. |
| You should also set <code class="language-plaintext highlighter-rouge">spark.executor.memory</code> to control the executor memory.</li> |
| <li><strong>YARN:</strong> The <code class="language-plaintext highlighter-rouge">--num-executors</code> option to the Spark YARN client controls how many executors it will allocate |
| on the cluster (<code class="language-plaintext highlighter-rouge">spark.executor.instances</code> as configuration property), while <code class="language-plaintext highlighter-rouge">--executor-memory</code> |
| (<code class="language-plaintext highlighter-rouge">spark.executor.memory</code> configuration property) and <code class="language-plaintext highlighter-rouge">--executor-cores</code> (<code class="language-plaintext highlighter-rouge">spark.executor.cores</code> configuration |
| property) control the resources per executor. For more information, see the |
| <a href="running-on-yarn.html">YARN Spark Properties</a>.</li> |
| </ul> |
| |
| <p>A second option available on Mesos is <em>dynamic sharing</em> of CPU cores. In this mode, each Spark application |
| still has a fixed and independent memory allocation (set by <code class="language-plaintext highlighter-rouge">spark.executor.memory</code>), but when the |
| application is not running tasks on a machine, other applications may run tasks on those cores. This mode |
| is useful when you expect large numbers of not overly active applications, such as shell sessions from |
| separate users. However, it comes with a risk of less predictable latency, because it may take a while for |
| an application to gain back cores on one node when it has work to do. To use this mode, simply use a |
| <code class="language-plaintext highlighter-rouge">mesos://</code> URL and set <code class="language-plaintext highlighter-rouge">spark.mesos.coarse</code> to false.</p> |
| |
| <p>Note that none of the modes currently provide memory sharing across applications. If you would like to share |
| data this way, we recommend running a single server application that can serve multiple requests by querying |
| the same RDDs.</p> |
| |
| <h2 id="dynamic-resource-allocation">Dynamic Resource Allocation</h2> |
| |
| <p>Spark provides a mechanism to dynamically adjust the resources your application occupies based |
| on the workload. This means that your application may give resources back to the cluster if they |
| are no longer used and request them again later when there is demand. This feature is particularly |
| useful if multiple applications share resources in your Spark cluster.</p> |
| |
| <p>This feature is disabled by default and available on all coarse-grained cluster managers, i.e. |
| <a href="spark-standalone.html">standalone mode</a>, <a href="running-on-yarn.html">YARN mode</a>, |
| <a href="running-on-mesos.html#mesos-run-modes">Mesos coarse-grained mode</a> and <a href="running-on-kubernetes.html">K8s mode</a>.</p> |
| |
| <h3 id="caveats">Caveats</h3> |
| |
| <ul> |
| <li>In <a href="spark-standalone.html">standalone mode</a>, without explicitly setting <code class="language-plaintext highlighter-rouge">spark.executor.cores</code>, each executor will get all the available cores of a worker. In this case, when dynamic allocation enabled, spark will possibly acquire much more executors than expected. When you want to use dynamic allocation in <a href="spark-standalone.html">standalone mode</a>, you are recommended to explicitly set cores for each executor before the issue <a href="https://issues.apache.org/jira/browse/SPARK-30299">SPARK-30299</a> got fixed.</li> |
| </ul> |
| |
| <h3 id="configuration-and-setup">Configuration and Setup</h3> |
| |
| <p>There are several ways for using this feature. |
| Regardless of which approach you choose, your application must set <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.enabled</code> to <code class="language-plaintext highlighter-rouge">true</code> first, additionally,</p> |
| |
| <ul> |
| <li>your application must set <code class="language-plaintext highlighter-rouge">spark.shuffle.service.enabled</code> to <code class="language-plaintext highlighter-rouge">true</code> after you set up an <em>external shuffle service</em> on each worker node in the same cluster, or</li> |
| <li>your application must set <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.shuffleTracking.enabled</code> to <code class="language-plaintext highlighter-rouge">true</code>, or</li> |
| <li>your application must set both <code class="language-plaintext highlighter-rouge">spark.decommission.enabled</code> and <code class="language-plaintext highlighter-rouge">spark.storage.decommission.shuffleBlocks.enabled</code> to <code class="language-plaintext highlighter-rouge">true</code>, or</li> |
| <li>your application must configure <code class="language-plaintext highlighter-rouge">spark.shuffle.sort.io.plugin.class</code> to use a custom <code class="language-plaintext highlighter-rouge">ShuffleDataIO</code> who’s <code class="language-plaintext highlighter-rouge">ShuffleDriverComponents</code> supports reliable storage.</li> |
| </ul> |
| |
| <p>The purpose of the external shuffle service or the shuffle tracking or the <code class="language-plaintext highlighter-rouge">ShuffleDriverComponents</code> supports reliable storage is to allow executors to be removed |
| without deleting shuffle files written by them (more detail described |
| <a href="job-scheduling.html#graceful-decommission-of-executors">below</a>). While it is simple to enable shuffle tracking, the way to set up the external shuffle service varies across cluster managers:</p> |
| |
| <p>In standalone mode, simply start your workers with <code class="language-plaintext highlighter-rouge">spark.shuffle.service.enabled</code> set to <code class="language-plaintext highlighter-rouge">true</code>.</p> |
| |
| <p>In Mesos coarse-grained mode, run <code class="language-plaintext highlighter-rouge">$SPARK_HOME/sbin/start-mesos-shuffle-service.sh</code> on all |
| worker nodes with <code class="language-plaintext highlighter-rouge">spark.shuffle.service.enabled</code> set to <code class="language-plaintext highlighter-rouge">true</code>. For instance, you may do so |
| through Marathon.</p> |
| |
| <p>In YARN mode, follow the instructions <a href="running-on-yarn.html#configuring-the-external-shuffle-service">here</a>.</p> |
| |
| <p>All other relevant configurations are optional and under the <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.*</code> and |
| <code class="language-plaintext highlighter-rouge">spark.shuffle.service.*</code> namespaces. For more detail, see the |
| <a href="configuration.html#dynamic-allocation">configurations page</a>.</p> |
| |
| <h3 id="resource-allocation-policy">Resource Allocation Policy</h3> |
| |
| <p>At a high level, Spark should relinquish executors when they are no longer used and acquire |
| executors when they are needed. Since there is no definitive way to predict whether an executor |
| that is about to be removed will run a task in the near future, or whether a new executor that is |
| about to be added will actually be idle, we need a set of heuristics to determine when to remove |
| and request executors.</p> |
| |
| <h4 id="request-policy">Request Policy</h4> |
| |
| <p>A Spark application with dynamic allocation enabled requests additional executors when it has |
| pending tasks waiting to be scheduled. This condition necessarily implies that the existing set |
| of executors is insufficient to simultaneously saturate all tasks that have been submitted but |
| not yet finished.</p> |
| |
| <p>Spark requests executors in rounds. The actual request is triggered when there have been pending |
| tasks for <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.schedulerBacklogTimeout</code> seconds, and then triggered again |
| every <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.sustainedSchedulerBacklogTimeout</code> seconds thereafter if the queue |
| of pending tasks persists. Additionally, the number of executors requested in each round increases |
| exponentially from the previous round. For instance, an application will add 1 executor in the |
| first round, and then 2, 4, 8 and so on executors in the subsequent rounds.</p> |
| |
| <p>The motivation for an exponential increase policy is twofold. First, an application should request |
| executors cautiously in the beginning in case it turns out that only a few additional executors is |
| sufficient. This echoes the justification for TCP slow start. Second, the application should be |
| able to ramp up its resource usage in a timely manner in case it turns out that many executors are |
| actually needed.</p> |
| |
| <h4 id="remove-policy">Remove Policy</h4> |
| |
| <p>The policy for removing executors is much simpler. A Spark application removes an executor when |
| it has been idle for more than <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.executorIdleTimeout</code> seconds. Note that, |
| under most circumstances, this condition is mutually exclusive with the request condition, in that |
| an executor should not be idle if there are still pending tasks to be scheduled.</p> |
| |
| <h3 id="graceful-decommission-of-executors">Graceful Decommission of Executors</h3> |
| |
| <p>Before dynamic allocation, if a Spark executor exits when the associated application has also exited |
| then all state associated with the executor is no longer needed and can be safely discarded. |
| With dynamic allocation, however, the application is still running when an executor is explicitly |
| removed. If the application attempts to access state stored in or written by the executor, it will |
| have to perform a recompute the state. Thus, Spark needs a mechanism to decommission an executor |
| gracefully by preserving its state before removing it.</p> |
| |
| <p>This requirement is especially important for shuffles. During a shuffle, the Spark executor first |
| writes its own map outputs locally to disk, and then acts as the server for those files when other |
| executors attempt to fetch them. In the event of stragglers, which are tasks that run for much |
| longer than their peers, dynamic allocation may remove an executor before the shuffle completes, |
| in which case the shuffle files written by that executor must be recomputed unnecessarily.</p> |
| |
| <p>The solution for preserving shuffle files is to use an external shuffle service, also introduced |
| in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster |
| independently of your Spark applications and their executors. If the service is enabled, Spark |
| executors will fetch shuffle files from the service instead of from each other. This means any |
| shuffle state written by an executor may continue to be served beyond the executor’s lifetime.</p> |
| |
| <p>In addition to writing shuffle files, executors also cache data either on disk or in memory. |
| When an executor is removed, however, all cached data will no longer be accessible. To mitigate this, |
| by default executors containing cached data are never removed. You can configure this behavior with |
| <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.cachedExecutorIdleTimeout</code>. When set <code class="language-plaintext highlighter-rouge">spark.shuffle.service.fetch.rdd.enabled</code> |
| to <code class="language-plaintext highlighter-rouge">true</code>, Spark can use ExternalShuffleService for fetching disk persisted RDD blocks. In case of |
| dynamic allocation if this feature is enabled executors having only disk persisted blocks are considered |
| idle after <code class="language-plaintext highlighter-rouge">spark.dynamicAllocation.executorIdleTimeout</code> and will be released accordingly. In future releases, |
| the cached data may be preserved through an off-heap storage similar in spirit to how shuffle files are preserved |
| through the external shuffle service.</p> |
| |
| <h1 id="scheduling-within-an-application">Scheduling Within an Application</h1> |
| |
| <p>Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if |
| they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. <code class="language-plaintext highlighter-rouge">save</code>, |
| <code class="language-plaintext highlighter-rouge">collect</code>) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe |
| and supports this use case to enable applications that serve multiple requests (e.g. queries for |
| multiple users).</p> |
| |
| <p>By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and |
| reduce phases), and the first job gets priority on all available resources while its stages have tasks to |
| launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use |
| the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are |
| large, then later jobs may be delayed significantly.</p> |
| |
| <p>Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, |
| Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share |
| of cluster resources. This means that short jobs submitted while a long job is running can start receiving |
| resources right away and still get good response times, without waiting for the long job to finish. This |
| mode is best for multi-user settings.</p> |
| |
| <p>This feature is disabled by default and available on all coarse-grained cluster managers, i.e. |
| <a href="spark-standalone.html">standalone mode</a>, <a href="running-on-yarn.html">YARN mode</a>, |
| <a href="running-on-kubernetes.html">K8s mode</a> and <a href="running-on-mesos.html#mesos-run-modes">Mesos coarse-grained mode</a>. |
| To enable the fair scheduler, simply set the <code class="language-plaintext highlighter-rouge">spark.scheduler.mode</code> property to <code class="language-plaintext highlighter-rouge">FAIR</code> when configuring |
| a SparkContext:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="py">setMaster</span><span class="o">(...).</span><span class="py">setAppName</span><span class="o">(...)</span> |
| <span class="nv">conf</span><span class="o">.</span><span class="py">set</span><span class="o">(</span><span class="s">"spark.scheduler.mode"</span><span class="o">,</span> <span class="s">"FAIR"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="nv">sc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span></code></pre></figure> |
| |
| <h2 id="fair-scheduler-pools">Fair Scheduler Pools</h2> |
| |
| <p>The fair scheduler also supports grouping jobs into <em>pools</em>, and setting different scheduling options |
| (e.g. weight) for each pool. This can be useful to create a “high-priority” pool for more important jobs, |
| for example, or to group the jobs of each user together and give <em>users</em> equal shares regardless of how |
| many concurrent jobs they have instead of giving <em>jobs</em> equal shares. This approach is modeled after the |
| <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html">Hadoop Fair Scheduler</a>.</p> |
| |
| <p>Without any intervention, newly submitted jobs go into a <em>default pool</em>, but jobs’ pools can be set by |
| adding the <code class="language-plaintext highlighter-rouge">spark.scheduler.pool</code> “local property” to the SparkContext in the thread that’s submitting them. |
| This is done as follows:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Assuming sc is your SparkContext variable</span> |
| <span class="nv">sc</span><span class="o">.</span><span class="py">setLocalProperty</span><span class="o">(</span><span class="s">"spark.scheduler.pool"</span><span class="o">,</span> <span class="s">"pool1"</span><span class="o">)</span></code></pre></figure> |
| |
| <p>After setting this local property, <em>all</em> jobs submitted within this thread (by calls in this thread |
| to <code class="language-plaintext highlighter-rouge">RDD.save</code>, <code class="language-plaintext highlighter-rouge">count</code>, <code class="language-plaintext highlighter-rouge">collect</code>, etc) will use this pool name. The setting is per-thread to make |
| it easy to have a thread run multiple jobs on behalf of the same user. If you’d like to clear the |
| pool that a thread is associated with, simply call:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">sc</span><span class="o">.</span><span class="py">setLocalProperty</span><span class="o">(</span><span class="s">"spark.scheduler.pool"</span><span class="o">,</span> <span class="kc">null</span><span class="o">)</span></code></pre></figure> |
| |
| <h2 id="default-behavior-of-pools">Default Behavior of Pools</h2> |
| |
| <p>By default, each pool gets an equal share of the cluster (also equal in share to each job in the default |
| pool), but inside each pool, jobs run in FIFO order. For example, if you create one pool per user, this |
| means that each user will get an equal share of the cluster, and that each user’s queries will run in |
| order instead of later queries taking resources from that user’s earlier ones.</p> |
| |
| <h2 id="configuring-pool-properties">Configuring Pool Properties</h2> |
| |
| <p>Specific pools’ properties can also be modified through a configuration file. Each pool supports three |
| properties:</p> |
| |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">schedulingMode</code>: This can be FIFO or FAIR, to control whether jobs within the pool queue up behind |
| each other (the default) or share the pool’s resources fairly.</li> |
| <li><code class="language-plaintext highlighter-rouge">weight</code>: This controls the pool’s share of the cluster relative to other pools. By default, all pools |
| have a weight of 1. If you give a specific pool a weight of 2, for example, it will get 2x more |
| resources as other active pools. Setting a high weight such as 1000 also makes it possible to implement |
| <em>priority</em> between pools—in essence, the weight-1000 pool will always get to launch tasks first |
| whenever it has jobs active.</li> |
| <li><code class="language-plaintext highlighter-rouge">minShare</code>: Apart from an overall weight, each pool can be given a <em>minimum shares</em> (as a number of |
| CPU cores) that the administrator would like it to have. The fair scheduler always attempts to meet |
| all active pools’ minimum shares before redistributing extra resources according to the weights. |
| The <code class="language-plaintext highlighter-rouge">minShare</code> property can, therefore, be another way to ensure that a pool can always get up to a |
| certain number of resources (e.g. 10 cores) quickly without giving it a high priority for the rest |
| of the cluster. By default, each pool’s <code class="language-plaintext highlighter-rouge">minShare</code> is 0.</li> |
| </ul> |
| |
| <p>The pool properties can be set by creating an XML file, similar to <code class="language-plaintext highlighter-rouge">conf/fairscheduler.xml.template</code>, |
| and either putting a file named <code class="language-plaintext highlighter-rouge">fairscheduler.xml</code> on the classpath, or setting <code class="language-plaintext highlighter-rouge">spark.scheduler.allocation.file</code> property in your |
| <a href="configuration.html#spark-properties">SparkConf</a>. The file path respects the hadoop configuration and can either be a local file path or HDFS file path.</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// scheduler file at local</span> |
| <span class="nv">conf</span><span class="o">.</span><span class="py">set</span><span class="o">(</span><span class="s">"spark.scheduler.allocation.file"</span><span class="o">,</span> <span class="s">"file:///path/to/file"</span><span class="o">)</span> |
| <span class="c1">// scheduler file at hdfs</span> |
| <span class="nv">conf</span><span class="o">.</span><span class="py">set</span><span class="o">(</span><span class="s">"spark.scheduler.allocation.file"</span><span class="o">,</span> <span class="s">"hdfs:///path/to/file"</span><span class="o">)</span></code></pre></figure> |
| |
| <p>The format of the XML file is simply a <code class="language-plaintext highlighter-rouge"><pool></code> element for each pool, with different elements |
| within it for the various settings. For example:</p> |
| |
| <figure class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="cp"><?xml version="1.0"?></span> |
| <span class="nt"><allocations></span> |
| <span class="nt"><pool</span> <span class="na">name=</span><span class="s">"production"</span><span class="nt">></span> |
| <span class="nt"><schedulingMode></span>FAIR<span class="nt"></schedulingMode></span> |
| <span class="nt"><weight></span>1<span class="nt"></weight></span> |
| <span class="nt"><minShare></span>2<span class="nt"></minShare></span> |
| <span class="nt"></pool></span> |
| <span class="nt"><pool</span> <span class="na">name=</span><span class="s">"test"</span><span class="nt">></span> |
| <span class="nt"><schedulingMode></span>FIFO<span class="nt"></schedulingMode></span> |
| <span class="nt"><weight></span>2<span class="nt"></weight></span> |
| <span class="nt"><minShare></span>3<span class="nt"></minShare></span> |
| <span class="nt"></pool></span> |
| <span class="nt"></allocations></span></code></pre></figure> |
| |
| <p>A full example is also available in <code class="language-plaintext highlighter-rouge">conf/fairscheduler.xml.template</code>. Note that any pools not |
| configured in the XML file will simply get default values for all settings (scheduling mode FIFO, |
| weight 1, and minShare 0).</p> |
| |
| <h2 id="scheduling-using-jdbc-connections">Scheduling using JDBC Connections</h2> |
| <p>To set a <a href="job-scheduling.html#fair-scheduler-pools">Fair Scheduler</a> pool for a JDBC client session, |
| users can set the <code class="language-plaintext highlighter-rouge">spark.sql.thriftserver.scheduler.pool</code> variable:</p> |
| |
| <figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SET</span> <span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">thriftserver</span><span class="p">.</span><span class="n">scheduler</span><span class="p">.</span><span class="n">pool</span><span class="o">=</span><span class="n">accounting</span><span class="p">;</span></code></pre></figure> |
| |
| <h2 id="concurrent-jobs-in-pyspark">Concurrent Jobs in PySpark</h2> |
| |
| <p>PySpark, by default, does not support to synchronize PVM threads with JVM threads and |
| launching multiple jobs in multiple PVM threads does not guarantee to launch each job |
| in each corresponding JVM thread. Due to this limitation, it is unable to set a different job group |
| via <code class="language-plaintext highlighter-rouge">sc.setJobGroup</code> in a separate PVM thread, which also disallows to cancel the job via <code class="language-plaintext highlighter-rouge">sc.cancelJobGroup</code> |
| later.</p> |
| |
| <p><code class="language-plaintext highlighter-rouge">pyspark.InheritableThread</code> is recommended to use together for a PVM thread to inherit the inheritable attributes |
| such as local properties in a JVM thread.</p> |
| |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-3.5.1.min.js"></script> |
| <script src="js/vendor/bootstrap.bundle.min.js"></script> |
| |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <script type="text/javascript" src="js/vendor/docsearch.min.js"></script> |
| <script type="text/javascript"> |
| // DocSearch is entirely free and automated. DocSearch is built in two parts: |
| // 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link |
| // in your website and extract content from every page it traverses. It then pushes this |
| // content to an Algolia index. |
| // 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index |
| // to your search input and display its results in a dropdown UI. If you want to find more |
| // details on how works DocSearch, check the docs of DocSearch. |
| docsearch({ |
| apiKey: 'd62f962a82bc9abb53471cb7b89da35e', |
| appId: 'RAI69RXRSK', |
| indexName: 'apache_spark', |
| inputSelector: '#docsearch-input', |
| enhancedSearchInput: true, |
| algoliaOptions: { |
| 'facetFilters': ["version:3.5.0"] |
| }, |
| debug: false // Set debug to true if you want to inspect the dropdown |
| }); |
| |
| </script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |