| |
| |
| |
| |
| <!DOCTYPE html> |
| <html class="no-js"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>Tuning - Spark 4.1.0-preview1 Documentation</title> |
| |
| <meta name="description" content="Tuning and performance optimization guide for Spark 4.1.0-preview1"> |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> |
| <link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet"> |
| <link href="css/custom.css" rel="stylesheet"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <link rel="stylesheet" href="css/docsearch.min.css" /> |
| <link rel="stylesheet" href="css/docsearch.css"> |
| |
| |
| <!-- Matomo --> |
| <script> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| _paq.push(["disableCookies"]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '40']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| |
| |
| </head> |
| <body class="global"> |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| <nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar"> |
| <div class="navbar-brand"><a href="index.html"> |
| <img src="https://spark.apache.org/images/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">4.1.0-preview1</span> |
| </div> |
| <button class="navbar-toggler" type="button" data-toggle="collapse" |
| data-target="#navbarCollapse" aria-controls="navbarCollapse" |
| aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarCollapse"> |
| <ul class="navbar-nav me-auto"> |
| <li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a> |
| <div class="dropdown-menu" aria-labelledby="navbarQuickStart"> |
| <a class="dropdown-item" href="quick-start.html">Quick Start</a> |
| <a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a> |
| <a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a> |
| <a class="dropdown-item" href="streaming/index.html">Structured Streaming</a> |
| <a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a> |
| <a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a> |
| <a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a> |
| <a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a> |
| <a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a> |
| <a class="dropdown-item" href="declarative-pipelines-programming-guide.html">Declarative Pipelines</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a> |
| <div class="dropdown-menu" aria-labelledby="navbarAPIDocs"> |
| <a class="dropdown-item" href="api/python/index.html">Python</a> |
| <a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a> |
| <a class="dropdown-item" href="api/java/index.html">Java</a> |
| <a class="dropdown-item" href="api/R/index.html">R</a> |
| <a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a> |
| <div class="dropdown-menu" aria-labelledby="navbarDeploying"> |
| <a class="dropdown-item" href="cluster-overview.html">Overview</a> |
| <a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a> |
| <a class="dropdown-item" href="running-on-yarn.html">YARN</a> |
| <a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a> |
| <div class="dropdown-menu" aria-labelledby="navbarMore"> |
| <a class="dropdown-item" href="configuration.html">Configuration</a> |
| <a class="dropdown-item" href="monitoring.html">Monitoring</a> |
| <a class="dropdown-item" href="tuning.html">Tuning Guide</a> |
| <a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a> |
| <a class="dropdown-item" href="security.html">Security</a> |
| <a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a> |
| <a class="dropdown-item" href="migration-guide.html">Migration Guide</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="building-spark.html">Building Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a> |
| </div> |
| </li> |
| |
| <li class="nav-item"> |
| <input type="text" id="docsearch-input" placeholder="Search the docs…"> |
| </li> |
| </ul> |
| <!--<span class="navbar-text navbar-right"><span class="version-text">v4.1.0-preview1</span></span>--> |
| </div> |
| </nav> |
| |
| |
| |
| <div class="container"> |
| |
| <div class="content mr-3" id="content"> |
| |
| |
| <h1 class="title">Tuning Spark</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#data-serialization" id="markdown-toc-data-serialization">Data Serialization</a></li> |
| <li><a href="#memory-tuning" id="markdown-toc-memory-tuning">Memory Tuning</a> <ul> |
| <li><a href="#memory-management-overview" id="markdown-toc-memory-management-overview">Memory Management Overview</a></li> |
| <li><a href="#determining-memory-consumption" id="markdown-toc-determining-memory-consumption">Determining Memory Consumption</a></li> |
| <li><a href="#tuning-data-structures" id="markdown-toc-tuning-data-structures">Tuning Data Structures</a></li> |
| <li><a href="#serialized-rdd-storage" id="markdown-toc-serialized-rdd-storage">Serialized RDD Storage</a></li> |
| <li><a href="#garbage-collection-tuning" id="markdown-toc-garbage-collection-tuning">Garbage Collection Tuning</a></li> |
| </ul> |
| </li> |
| <li><a href="#other-considerations" id="markdown-toc-other-considerations">Other Considerations</a> <ul> |
| <li><a href="#level-of-parallelism" id="markdown-toc-level-of-parallelism">Level of Parallelism</a></li> |
| <li><a href="#parallel-listing-on-input-paths" id="markdown-toc-parallel-listing-on-input-paths">Parallel Listing on Input Paths</a></li> |
| <li><a href="#memory-usage-of-reduce-tasks" id="markdown-toc-memory-usage-of-reduce-tasks">Memory Usage of Reduce Tasks</a></li> |
| <li><a href="#broadcasting-large-variables" id="markdown-toc-broadcasting-large-variables">Broadcasting Large Variables</a></li> |
| <li><a href="#data-locality" id="markdown-toc-data-locality">Data Locality</a></li> |
| </ul> |
| </li> |
| <li><a href="#summary" id="markdown-toc-summary">Summary</a></li> |
| </ul> |
| |
| <p>Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked |
| by any resource in the cluster: CPU, network bandwidth, or memory. |
| Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes, you |
| also need to do some tuning, such as |
| <a href="rdd-programming-guide.html#rdd-persistence">storing RDDs in serialized form</a>, to |
| decrease memory usage. |
| This guide will cover two main topics: data serialization, which is crucial for good network |
| performance and can also reduce memory use, and memory tuning. We also sketch several smaller topics.</p> |
| |
| <h1 id="data-serialization">Data Serialization</h1> |
| |
| <p>Serialization plays an important role in the performance of any distributed application. |
| Formats that are slow to serialize objects into, or consume a large number of |
| bytes, will greatly slow down the computation. |
| Often, this will be the first thing you should tune to optimize a Spark application. |
| Spark aims to strike a balance between convenience (allowing you to work with any Java type |
| in your operations) and performance. It provides two serialization libraries:</p> |
| |
| <ul> |
| <li><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/Serializable.html">Java serialization</a>: |
| By default, Spark serializes objects using Java’s <code class="language-plaintext highlighter-rouge">ObjectOutputStream</code> framework, and can work |
| with any class you create that implements |
| <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/Serializable.html"><code class="language-plaintext highlighter-rouge">java.io.Serializable</code></a>. |
| You can also control the performance of your serialization more closely by extending |
| <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/Externalizable.html"><code class="language-plaintext highlighter-rouge">java.io.Externalizable</code></a>. |
| Java serialization is flexible but often quite slow, and leads to large |
| serialized formats for many classes.</li> |
| <li><a href="https://github.com/EsotericSoftware/kryo">Kryo serialization</a>: Spark can also use |
| the Kryo library (version 4) to serialize objects more quickly. Kryo is significantly |
| faster and more compact than Java serialization (often as much as 10x), but does not support all |
| <code class="language-plaintext highlighter-rouge">Serializable</code> types and requires you to <em>register</em> the classes you’ll use in the program in advance |
| for best performance.</li> |
| </ul> |
| |
| <p>You can switch to using Kryo by initializing your job with a <a href="configuration.html#spark-properties">SparkConf</a> |
| and calling <code class="language-plaintext highlighter-rouge">conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")</code>. |
| This setting configures the serializer used for not only shuffling data between worker |
| nodes but also when serializing RDDs to disk. The only reason Kryo is not the default is because of the custom |
| registration requirement, but we recommend trying it in any network-intensive application. |
| Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.</p> |
| |
| <p>Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered |
| in the AllScalaRegistrar from the <a href="https://github.com/twitter/chill">Twitter chill</a> library.</p> |
| |
| <p>To register your own custom classes with Kryo, use the <code class="language-plaintext highlighter-rouge">registerKryoClasses</code> method.</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="py">setMaster</span><span class="o">(...).</span><span class="py">setAppName</span><span class="o">(...)</span> |
| <span class="nv">conf</span><span class="o">.</span><span class="py">registerKryoClasses</span><span class="o">(</span><span class="nc">Array</span><span class="o">(</span><span class="n">classOf</span><span class="o">[</span><span class="kt">MyClass1</span><span class="o">],</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">MyClass2</span><span class="o">]))</span> |
| <span class="k">val</span> <span class="nv">sc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span></code></pre></figure> |
| |
| <p>The <a href="https://github.com/EsotericSoftware/kryo">Kryo documentation</a> describes more advanced |
| registration options, such as adding custom serialization code.</p> |
| |
| <p>If your objects are large, you may also need to increase the <code class="language-plaintext highlighter-rouge">spark.kryoserializer.buffer</code> |
| <a href="configuration.html#compression-and-serialization">config</a>. This value needs to be large enough |
| to hold the <em>largest</em> object you will serialize.</p> |
| |
| <p>Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store |
| the full class name with each object, which is wasteful.</p> |
| |
| <h1 id="memory-tuning">Memory Tuning</h1> |
| |
| <p>There are three considerations in tuning memory usage: the <em>amount</em> of memory used by your objects |
| (you may want your entire dataset to fit in memory), the <em>cost</em> of accessing those objects, and the |
| overhead of <em>garbage collection</em> (if you have high turnover in terms of objects).</p> |
| |
| <p>By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space |
| than the “raw” data inside their fields. This is due to several reasons:</p> |
| |
| <ul> |
| <li>Each distinct Java object has an “object header”, which is about 16 bytes and contains information |
| such as a pointer to its class. For an object with very little data in it (say one <code class="language-plaintext highlighter-rouge">Int</code> field), this |
| can be bigger than the data.</li> |
| <li>Java <code class="language-plaintext highlighter-rouge">String</code>s have about 40 bytes of overhead over the raw string data (since they store it in an |
| array of <code class="language-plaintext highlighter-rouge">Char</code>s and keep extra data such as the length), and store each character |
| as <em>two</em> bytes due to <code class="language-plaintext highlighter-rouge">String</code>’s internal usage of UTF-16 encoding. Thus a 10-character string can |
| easily consume 60 bytes.</li> |
| <li>Common collection classes, such as <code class="language-plaintext highlighter-rouge">HashMap</code> and <code class="language-plaintext highlighter-rouge">LinkedList</code>, use linked data structures, where |
| there is a “wrapper” object for each entry (e.g. <code class="language-plaintext highlighter-rouge">Map.Entry</code>). This object not only has a header, |
| but also pointers (typically 8 bytes each) to the next object in the list.</li> |
| <li>Collections of primitive types often store them as “boxed” objects such as <code class="language-plaintext highlighter-rouge">java.lang.Integer</code>.</li> |
| </ul> |
| |
| <p>This section will start with an overview of memory management in Spark, then discuss specific |
| strategies the user can take to make more efficient use of memory in his/her application. In |
| particular, we will describe how to determine the memory usage of your objects, and how to |
| improve it – either by changing your data structures, or by storing data in a serialized |
| format. We will then cover tuning Spark’s cache size and the Java garbage collector.</p> |
| |
| <h2 id="memory-management-overview">Memory Management Overview</h2> |
| |
| <p>Memory usage in Spark largely falls under one of two categories: execution and storage. |
| Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, |
| while storage memory refers to that used for caching and propagating internal data across the |
| cluster. In Spark, execution and storage share a unified region (M). When no execution memory is |
| used, storage can acquire all the available memory and vice versa. Execution may evict storage |
| if necessary, but only until total storage memory usage falls under a certain threshold (R). |
| In other words, <code class="language-plaintext highlighter-rouge">R</code> describes a subregion within <code class="language-plaintext highlighter-rouge">M</code> where cached blocks are never evicted. |
| Storage may not evict execution due to complexities in implementation.</p> |
| |
| <p>This design ensures several desirable properties. First, applications that do not use caching |
| can use the entire space for execution, obviating unnecessary disk spills. Second, applications |
| that do use caching can reserve a minimum storage space (R) where their data blocks are immune |
| to being evicted. Lastly, this approach provides reasonable out-of-the-box performance for a |
| variety of workloads without requiring user expertise of how memory is divided internally.</p> |
| |
| <p>Although there are two relevant configurations, the typical user should not need to adjust them |
| as the default values are applicable to most workloads:</p> |
| |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">spark.memory.fraction</code> expresses the size of <code class="language-plaintext highlighter-rouge">M</code> as a fraction of the (JVM heap space - 300MiB) |
| (default 0.6). The rest of the space (40%) is reserved for user data structures, internal |
| metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually |
| large records.</li> |
| <li><code class="language-plaintext highlighter-rouge">spark.memory.storageFraction</code> expresses the size of <code class="language-plaintext highlighter-rouge">R</code> as a fraction of <code class="language-plaintext highlighter-rouge">M</code> (default 0.5). |
| <code class="language-plaintext highlighter-rouge">R</code> is the storage space within <code class="language-plaintext highlighter-rouge">M</code> where cached blocks immune to being evicted by execution.</li> |
| </ul> |
| |
| <p>The value of <code class="language-plaintext highlighter-rouge">spark.memory.fraction</code> should be set in order to fit this amount of heap space |
| comfortably within the JVM’s old or “tenured” generation. See the discussion of advanced GC |
| tuning below for details.</p> |
| |
| <h2 id="determining-memory-consumption">Determining Memory Consumption</h2> |
| |
| <p>The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it |
| into cache, and look at the “Storage” page in the web UI. The page will tell you how much memory the RDD |
| is occupying.</p> |
| |
| <p>To estimate the memory consumption of a particular object, use <code class="language-plaintext highlighter-rouge">SizeEstimator</code>’s <code class="language-plaintext highlighter-rouge">estimate</code> method. |
| This is useful for experimenting with different data layouts to trim memory usage, as well as |
| determining the amount of space a broadcast variable will occupy on each executor heap.</p> |
| |
| <h2 id="tuning-data-structures">Tuning Data Structures</h2> |
| |
| <p>The first way to reduce memory consumption is to avoid the Java features that add overhead, such as |
| pointer-based data structures and wrapper objects. There are several ways to do this:</p> |
| |
| <ol> |
| <li>Design your data structures to prefer arrays of objects, and primitive types, instead of the |
| standard Java or Scala collection classes (e.g. <code class="language-plaintext highlighter-rouge">HashMap</code>). The <a href="http://fastutil.di.unimi.it">fastutil</a> |
| library provides convenient collection classes for primitive types that are compatible with the |
| Java standard library.</li> |
| <li>Avoid nested structures with a lot of small objects and pointers when possible.</li> |
| <li>Consider using numeric IDs or enumeration objects instead of strings for keys.</li> |
| <li>If you have less than 32 GiB of RAM, set the JVM flag <code class="language-plaintext highlighter-rouge">-XX:+UseCompressedOops</code> to make pointers be |
| four bytes instead of eight. You can add these options in |
| <a href="configuration.html#environment-variables"><code class="language-plaintext highlighter-rouge">spark-env.sh</code></a>.</li> |
| </ol> |
| |
| <h2 id="serialized-rdd-storage">Serialized RDD Storage</h2> |
| |
| <p>When your objects are still too large to efficiently store despite this tuning, a much simpler way |
| to reduce memory usage is to store them in <em>serialized</em> form, using the serialized StorageLevels in |
| the <a href="rdd-programming-guide.html#rdd-persistence">RDD persistence API</a>, such as <code class="language-plaintext highlighter-rouge">MEMORY_ONLY_SER</code>. |
| Spark will then store each RDD partition as one large byte array. |
| The only downside of storing data in serialized form is slower access times, due to having to |
| deserialize each object on the fly. |
| We highly recommend <a href="#data-serialization">using Kryo</a> if you want to cache data in serialized form, as |
| it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).</p> |
| |
| <h2 id="garbage-collection-tuning">Garbage Collection Tuning</h2> |
| |
| <p>JVM garbage collection can be a problem when you have large “churn” in terms of the RDDs |
| stored by your program. (It is usually not a problem in programs that just read an RDD once |
| and then run many operations on it.) When Java needs to evict old objects to make room for new ones, it will |
| need to trace through all your Java objects and find the unused ones. The main point to remember here is |
| that <em>the cost of garbage collection is proportional to the number of Java objects</em>, so using data |
| structures with fewer objects (e.g. an array of <code class="language-plaintext highlighter-rouge">Int</code>s instead of a <code class="language-plaintext highlighter-rouge">LinkedList</code>) greatly lowers |
| this cost. An even better method is to persist objects in serialized form, as described above: now |
| there will be only <em>one</em> object (a byte array) per RDD partition. Before trying other |
| techniques, the first thing to try if GC is a problem is to use <a href="#serialized-rdd-storage">serialized caching</a>.</p> |
| |
| <p>GC can also be a problem due to interference between your tasks’ working memory (the |
| amount of space needed to run the task) and the RDDs cached on your nodes. We will discuss how to control |
| the space allocated to the RDD cache to mitigate this.</p> |
| |
| <p><strong>Measuring the Impact of GC</strong></p> |
| |
| <p>The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of |
| time spent GC. This can be done by adding <code class="language-plaintext highlighter-rouge">-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</code> to the Java options. (See the <a href="configuration.html#dynamically-loading-spark-properties">configuration guide</a> for info on passing Java options to Spark jobs.) Next time your Spark job is run, you will see messages printed in the worker’s logs |
| each time a garbage collection occurs. Note these logs will be on your cluster’s worker nodes (in the <code class="language-plaintext highlighter-rouge">stdout</code> files in |
| their work directories), <em>not</em> on your driver program.</p> |
| |
| <p><strong>Advanced GC Tuning</strong></p> |
| |
| <p>To further tune garbage collection, we first need to understand some basic information about memory management in the JVM:</p> |
| |
| <ul> |
| <li> |
| <p>Java Heap space is divided into two regions Young and Old. The Young generation is meant to hold short-lived objects |
| while the Old generation is intended for objects with longer lifetimes.</p> |
| </li> |
| <li> |
| <p>The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].</p> |
| </li> |
| <li> |
| <p>A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects |
| that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old |
| enough or Survivor2 is full, it is moved to Old. Finally, when Old is close to full, a full GC is invoked.</p> |
| </li> |
| </ul> |
| |
| <p>The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that |
| the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect |
| temporary objects created during task execution. Some steps which may be useful are:</p> |
| |
| <ul> |
| <li> |
| <p>Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times |
| before a task completes, it means that there isn’t enough memory available for executing tasks.</p> |
| </li> |
| <li> |
| <p>If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You |
| can set the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden |
| is determined to be <code class="language-plaintext highlighter-rouge">E</code>, then you can set the size of the Young generation using the option <code class="language-plaintext highlighter-rouge">-Xmn=4/3*E</code>. (The scaling |
| up by 4/3 is to account for space used by survivor regions as well.)</p> |
| </li> |
| <li> |
| <p>In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of |
| memory used for caching by lowering <code class="language-plaintext highlighter-rouge">spark.memory.fraction</code>; it is better to cache fewer |
| objects than to slow down task execution. Alternatively, consider decreasing the size of |
| the Young generation. This means lowering <code class="language-plaintext highlighter-rouge">-Xmn</code> if you’ve set it as above. If not, try changing the |
| value of the JVM’s <code class="language-plaintext highlighter-rouge">NewRatio</code> parameter. Many JVMs default this to 2, meaning that the Old generation |
| occupies 2/3 of the heap. It should be large enough such that this fraction exceeds <code class="language-plaintext highlighter-rouge">spark.memory.fraction</code>.</p> |
| </li> |
| <li> |
| <p>Since 4.0.0, Spark uses JDK 17 by default, which also makes the G1GC garbage collector the default. Note that with |
| large executor heap sizes, it may be important to increase the G1 region size with <code class="language-plaintext highlighter-rouge">-XX:G1HeapRegionSize</code>.</p> |
| </li> |
| <li> |
| <p>As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using |
| the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the |
| size of the block. So if we wish to have 3 or 4 tasks’ worth of working space, and the HDFS block size is 128 MiB, |
| we can estimate the size of Eden to be <code class="language-plaintext highlighter-rouge">4*3*128MiB</code>.</p> |
| </li> |
| <li> |
| <p>Monitor how the frequency and time taken by garbage collection changes with the new settings.</p> |
| </li> |
| </ul> |
| |
| <p>Our experience suggests that the effect of GC tuning depends on your application and the amount of memory available. |
| There are <a href="https://docs.oracle.com/en/java/javase/17/gctuning/introduction-garbage-collection-tuning.html">many more tuning options</a> described online, |
| but at a high level, managing how frequently full GC takes place can help in reducing the overhead.</p> |
| |
| <p>GC tuning flags for executors can be specified by setting <code class="language-plaintext highlighter-rouge">spark.executor.defaultJavaOptions</code> or <code class="language-plaintext highlighter-rouge">spark.executor.extraJavaOptions</code> in |
| a job’s configuration.</p> |
| |
| <h1 id="other-considerations">Other Considerations</h1> |
| |
| <h2 id="level-of-parallelism">Level of Parallelism</h2> |
| |
| <p>Clusters will not be fully utilized unless you set the level of parallelism for each operation high |
| enough. Spark automatically sets the number of “map” tasks to run on each file according to its size |
| (though you can control it through optional parameters to <code class="language-plaintext highlighter-rouge">SparkContext.textFile</code>, etc), and for |
| distributed “reduce” operations, such as <code class="language-plaintext highlighter-rouge">groupByKey</code> and <code class="language-plaintext highlighter-rouge">reduceByKey</code>, it uses the largest |
| parent RDD’s number of partitions. You can pass the level of parallelism as a second argument |
| (see the <a href="api/scala/org/apache/spark/rdd/PairRDDFunctions.html"><code class="language-plaintext highlighter-rouge">spark.PairRDDFunctions</code></a> documentation), |
| or set the config property <code class="language-plaintext highlighter-rouge">spark.default.parallelism</code> to change the default. |
| In general, we recommend 2-3 tasks per CPU core in your cluster.</p> |
| |
| <h2 id="parallel-listing-on-input-paths">Parallel Listing on Input Paths</h2> |
| |
| <p>Sometimes you may also need to increase directory listing parallelism when job input has large number of directories, |
| otherwise the process could take a very long time, especially when against object store like S3. |
| If your job works on RDD with Hadoop input formats (e.g., via <code class="language-plaintext highlighter-rouge">SparkContext.sequenceFile</code>), the parallelism is |
| controlled via <a href="https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml"><code class="language-plaintext highlighter-rouge">spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads</code></a> (currently default is 1).</p> |
| |
| <p>For Spark SQL with file-based data sources, you can tune <code class="language-plaintext highlighter-rouge">spark.sql.sources.parallelPartitionDiscovery.threshold</code> and |
| <code class="language-plaintext highlighter-rouge">spark.sql.sources.parallelPartitionDiscovery.parallelism</code> to improve listing parallelism. Please |
| refer to <a href="sql-performance-tuning.html">Spark SQL performance tuning guide</a> for more details.</p> |
| |
| <h2 id="memory-usage-of-reduce-tasks">Memory Usage of Reduce Tasks</h2> |
| |
| <p>Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the |
| working set of one of your tasks, such as one of the reduce tasks in <code class="language-plaintext highlighter-rouge">groupByKey</code>, was too large. |
| Spark’s shuffle operations (<code class="language-plaintext highlighter-rouge">sortByKey</code>, <code class="language-plaintext highlighter-rouge">groupByKey</code>, <code class="language-plaintext highlighter-rouge">reduceByKey</code>, <code class="language-plaintext highlighter-rouge">join</code>, etc) build a hash table |
| within each task to perform the grouping, which can often be large. The simplest fix here is to |
| <em>increase the level of parallelism</em>, so that each task’s input set is smaller. Spark can efficiently |
| support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has |
| a low task launching cost, so you can safely increase the level of parallelism to more than the |
| number of cores in your clusters.</p> |
| |
| <h2 id="broadcasting-large-variables">Broadcasting Large Variables</h2> |
| |
| <p>Using the <a href="rdd-programming-guide.html#broadcast-variables">broadcast functionality</a> |
| available in <code class="language-plaintext highlighter-rouge">SparkContext</code> can greatly reduce the size of each serialized task, and the cost |
| of launching a job over a cluster. If your tasks use any large object from the driver program |
| inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. |
| Spark prints the serialized size of each task on the master, so you can look at that to |
| decide whether your tasks are too large; in general, tasks larger than about 20 KiB are probably |
| worth optimizing.</p> |
| |
| <h2 id="data-locality">Data Locality</h2> |
| |
| <p>Data locality can have a major impact on the performance of Spark jobs. If data and the code that |
| operates on it are together, then computation tends to be fast. But if code and data are separated, |
| one must move to the other. Typically, it is faster to ship serialized code from place to place than |
| a chunk of data because code size is much smaller than data. Spark builds its scheduling around |
| this general principle of data locality.</p> |
| |
| <p>Data locality is how close data is to the code processing it. There are several levels of |
| locality based on the data’s current location. In order from closest to farthest:</p> |
| |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">PROCESS_LOCAL</code> data is in the same JVM as the running code. This is the best locality |
| possible.</li> |
| <li><code class="language-plaintext highlighter-rouge">NODE_LOCAL</code> data is on the same node. Examples might be in HDFS on the same node, or in |
| another executor on the same node. This is a little slower than <code class="language-plaintext highlighter-rouge">PROCESS_LOCAL</code> because the data |
| has to travel between processes.</li> |
| <li><code class="language-plaintext highlighter-rouge">NO_PREF</code> data is accessed equally quickly from anywhere and has no locality preference.</li> |
| <li><code class="language-plaintext highlighter-rouge">RACK_LOCAL</code> data is on the same rack of servers. Data is on a different server on the same rack |
| so needs to be sent over the network, typically through a single switch.</li> |
| <li><code class="language-plaintext highlighter-rouge">ANY</code> data is elsewhere on the network and not in the same rack.</li> |
| </ul> |
| |
| <p>Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In |
| situations where there is no unprocessed data on any idle executor, Spark switches to lower locality |
| levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same |
| server, or b) immediately start a new task in a farther away place that requires moving data there.</p> |
| |
| <p>What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout |
| expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback |
| between each level can be configured individually or all together in one parameter; see the |
| <code class="language-plaintext highlighter-rouge">spark.locality</code> parameters on the <a href="configuration.html#scheduling">configuration page</a> for details. |
| You should increase these settings if your tasks are long and see poor locality, but the default |
| usually works well.</p> |
| |
| <h1 id="summary">Summary</h1> |
| |
| <p>This has been a short guide to point out the main concerns you should know about when tuning a |
| Spark application – most importantly, data serialization and memory tuning. For most programs, |
| switching to Kryo serialization and persisting data in serialized form will solve most common |
| performance issues. Feel free to ask on the |
| <a href="https://spark.apache.org/community.html">Spark mailing list</a> about other tuning best practices.</p> |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-3.5.1.min.js"></script> |
| <script src="js/vendor/bootstrap.bundle.min.js"></script> |
| |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <script type="text/javascript" src="js/vendor/docsearch.min.js"></script> |
| <script type="text/javascript"> |
| // DocSearch is entirely free and automated. DocSearch is built in two parts: |
| // 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link |
| // in your website and extract content from every page it traverses. It then pushes this |
| // content to an Algolia index. |
| // 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index |
| // to your search input and display its results in a dropdown UI. If you want to find more |
| // details on how works DocSearch, check the docs of DocSearch. |
| docsearch({ |
| apiKey: 'd62f962a82bc9abb53471cb7b89da35e', |
| appId: 'RAI69RXRSK', |
| indexName: 'apache_spark', |
| inputSelector: '#docsearch-input', |
| enhancedSearchInput: true, |
| algoliaOptions: { |
| 'facetFilters': ["version:4.1.0-preview1"] |
| }, |
| debug: false // Set debug to true if you want to inspect the dropdown |
| }); |
| |
| </script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |