blob: 4930bce8a58d1ac022fc4795269896875b5c95cb [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Performance Tuning - Spark 3.2.0 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.css" />
<link rel="stylesheet" href="css/docsearch.css">
<!-- Google analytics script -->
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-32518208-2']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar fixed-top navbar-expand-md navbar-light bg-light" id="topbar">
<div class="container">
<div class="navbar-header">
<div class="navbar-brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">3.2.0</span>
</div>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="quick-start.html">Quick Start</a>
<a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a>
<a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="api/java/index.html">Java</a>
<a class="dropdown-item" href="api/python/index.html">Python</a>
<a class="dropdown-item" href="api/R/index.html">R</a>
<a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="cluster-overview.html">Overview</a>
<a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="running-on-mesos.html">Mesos</a>
<a class="dropdown-item" href="running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="configuration.html">Configuration</a>
<a class="dropdown-item" href="monitoring.html">Monitoring</a>
<a class="dropdown-item" href="tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="security.html">Security</a>
<a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v3.2.0</span></span>-->
</div>
</div>
</nav>
<div class="container-wrapper">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3>
<ul>
<li>
<a href="sql-getting-started.html">
Getting Started
</a>
</li>
<li>
<a href="sql-data-sources.html">
Data Sources
</a>
</li>
<li>
<a href="sql-performance-tuning.html">
<b>Performance Tuning</b>
</a>
</li>
<ul>
<li>
<a href="sql-performance-tuning.html#caching-data-in-memory">
Caching Data In Memory
</a>
</li>
<li>
<a href="sql-performance-tuning.html#other-configuration-options">
Other Configuration Options
</a>
</li>
<li>
<a href="sql-performance-tuning.html#join-strategy-hints-for-sql-queries">
Join Strategy Hints for SQL Queries
</a>
</li>
<li>
<a href="sql-performance-tuning.html#coalesce-hints-for-sql-queries">
Coalesce Hints for SQL Queries
</a>
</li>
<li>
<a href="sql-performance-tuning.html#adaptive-query-execution">
Adaptive Query Execution
</a>
</li>
</ul>
<li>
<a href="sql-distributed-sql-engine.html">
Distributed SQL Engine
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html">
PySpark Usage Guide for Pandas with Apache Arrow
</a>
</li>
<li>
<a href="sql-migration-old.html">
Migration Guide
</a>
</li>
<li>
<a href="sql-ref.html">
SQL Reference
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar mr-3" id="content">
<h1 class="title">Performance Tuning</h1>
<ul id="markdown-toc">
<li><a href="#caching-data-in-memory" id="markdown-toc-caching-data-in-memory">Caching Data In Memory</a></li>
<li><a href="#other-configuration-options" id="markdown-toc-other-configuration-options">Other Configuration Options</a></li>
<li><a href="#join-strategy-hints-for-sql-queries" id="markdown-toc-join-strategy-hints-for-sql-queries">Join Strategy Hints for SQL Queries</a></li>
<li><a href="#coalesce-hints-for-sql-queries" id="markdown-toc-coalesce-hints-for-sql-queries">Coalesce Hints for SQL Queries</a></li>
<li><a href="#adaptive-query-execution" id="markdown-toc-adaptive-query-execution">Adaptive Query Execution</a> <ul>
<li><a href="#coalescing-post-shuffle-partitions" id="markdown-toc-coalescing-post-shuffle-partitions">Coalescing Post Shuffle Partitions</a></li>
<li><a href="#converting-sort-merge-join-to-broadcast-join" id="markdown-toc-converting-sort-merge-join-to-broadcast-join">Converting sort-merge join to broadcast join</a></li>
<li><a href="#converting-sort-merge-join-to-shuffled-hash-join" id="markdown-toc-converting-sort-merge-join-to-shuffled-hash-join">Converting sort-merge join to shuffled hash join</a></li>
<li><a href="#optimizing-skew-join" id="markdown-toc-optimizing-skew-join">Optimizing Skew Join</a></li>
</ul>
</li>
</ul>
<p>For some workloads, it is possible to improve performance by either caching data in memory, or by
turning on some experimental options.</p>
<h2 id="caching-data-in-memory">Caching Data In Memory</h2>
<p>Spark SQL can cache tables using an in-memory columnar format by calling <code class="language-plaintext highlighter-rouge">spark.catalog.cacheTable("tableName")</code> or <code class="language-plaintext highlighter-rouge">dataFrame.cache()</code>.
Then Spark SQL will scan only required columns and will automatically tune compression to minimize
memory usage and GC pressure. You can call <code class="language-plaintext highlighter-rouge">spark.catalog.uncacheTable("tableName")</code> or <code class="language-plaintext highlighter-rouge">dataFrame.unpersist()</code> to remove the table from memory.</p>
<p>Configuration of in-memory caching can be done using the <code class="language-plaintext highlighter-rouge">setConf</code> method on <code class="language-plaintext highlighter-rouge">SparkSession</code> or by running
<code class="language-plaintext highlighter-rouge">SET key=value</code> commands using SQL.</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
<td>true</td>
<td>
When set to true Spark SQL will automatically select a compression codec for each column based
on statistics of the data.
</td>
<td>1.0.1</td>
</tr>
<tr>
<td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
<td>10000</td>
<td>
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization
and compression, but risk OOMs when caching data.
</td>
<td>1.1.1</td>
</tr>
</table>
<h2 id="other-configuration-options">Other Configuration Options</h2>
<p>The following options can also be used to tune the performance of query execution. It is possible
that these options will be deprecated in future release as more optimizations are performed automatically.</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.sql.files.maxPartitionBytes</code></td>
<td>134217728 (128 MB)</td>
<td>
The maximum number of bytes to pack into a single partition when reading files.
This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
</td>
<td>2.0.0</td>
</tr>
<tr>
<td><code>spark.sql.files.openCostInBytes</code></td>
<td>4194304 (4 MB)</td>
<td>
The estimated cost to open a file, measured by the number of bytes could be scanned in the same
time. This is used when putting multiple files into a partition. It is better to over-estimated,
then the partitions with small files will be faster than partitions with bigger files (which is
scheduled first). This configuration is effective only when using file-based sources such as Parquet,
JSON and ORC.
</td>
<td>2.0.0</td>
</tr>
<tr>
<td><code>spark.sql.files.minPartitionNum</code></td>
<td>Default Parallelism</td>
<td>
The suggested (not guaranteed) minimum number of split file partitions. If not set, the default
value is `spark.default.parallelism`. This configuration is effective only when using file-based
sources such as Parquet, JSON and ORC.
</td>
<td>3.1.0</td>
</tr>
<tr>
<td><code>spark.sql.broadcastTimeout</code></td>
<td>300</td>
<td>
<p>
Timeout in seconds for the broadcast wait time in broadcast joins
</p>
</td>
<td>1.3.0</td>
</tr>
<tr>
<td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
<td>10485760 (10 MB)</td>
<td>
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
statistics are only supported for Hive Metastore tables where the command
<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.
</td>
<td>1.1.0</td>
</tr>
<tr>
<td><code>spark.sql.shuffle.partitions</code></td>
<td>200</td>
<td>
Configures the number of partitions to use when shuffling data for joins or aggregations.
</td>
<td>1.1.0</td>
</tr>
<tr>
<td><code>spark.sql.sources.parallelPartitionDiscovery.threshold</code></td>
<td>32</td>
<td>
Configures the threshold to enable parallel listing for job input paths. If the number of
input paths is larger than this threshold, Spark will list the files by using Spark distributed job.
Otherwise, it will fallback to sequential listing. This configuration is only effective when
using file-based data sources such as Parquet, ORC and JSON.
</td>
<td>1.5.0</td>
</tr>
<tr>
<td><code>spark.sql.sources.parallelPartitionDiscovery.parallelism</code></td>
<td>10000</td>
<td>
Configures the maximum listing parallelism for job input paths. In case the number of input
paths is larger than this value, it will be throttled down to use this value. Same as above,
this configuration is only effective when using file-based data sources such as Parquet, ORC
and JSON.
</td>
<td>2.1.1</td>
</tr>
</table>
<h2 id="join-strategy-hints-for-sql-queries">Join Strategy Hints for SQL Queries</h2>
<p>The join strategy hints, namely <code class="language-plaintext highlighter-rouge">BROADCAST</code>, <code class="language-plaintext highlighter-rouge">MERGE</code>, <code class="language-plaintext highlighter-rouge">SHUFFLE_HASH</code> and <code class="language-plaintext highlighter-rouge">SHUFFLE_REPLICATE_NL</code>,
instruct Spark to use the hinted strategy on each specified relation when joining them with another
relation. For example, when the <code class="language-plaintext highlighter-rouge">BROADCAST</code> hint is used on table &#8216;t1&#8217;, broadcast join (either
broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
with &#8216;t1&#8217; as the build side will be prioritized by Spark even if the size of table &#8216;t1&#8217; suggested
by the statistics is above the configuration <code class="language-plaintext highlighter-rouge">spark.sql.autoBroadcastJoinThreshold</code>.</p>
<p>When different join strategy hints are specified on both sides of a join, Spark prioritizes the
<code class="language-plaintext highlighter-rouge">BROADCAST</code> hint over the <code class="language-plaintext highlighter-rouge">MERGE</code> hint over the <code class="language-plaintext highlighter-rouge">SHUFFLE_HASH</code> hint over the <code class="language-plaintext highlighter-rouge">SHUFFLE_REPLICATE_NL</code>
hint. When both sides are specified with the <code class="language-plaintext highlighter-rouge">BROADCAST</code> hint or the <code class="language-plaintext highlighter-rouge">SHUFFLE_HASH</code> hint, Spark will
pick the build side based on the join type and the sizes of the relations.</p>
<p>Note that there is no guarantee that Spark will choose the join strategy specified in the hint since
a specific strategy may not support all join types.</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">spark</span><span class="o">.</span><span class="py">table</span><span class="o">(</span><span class="s">"src"</span><span class="o">).</span><span class="py">join</span><span class="o">(</span><span class="nv">spark</span><span class="o">.</span><span class="py">table</span><span class="o">(</span><span class="s">"records"</span><span class="o">).</span><span class="py">hint</span><span class="o">(</span><span class="s">"broadcast"</span><span class="o">),</span> <span class="s">"key"</span><span class="o">).</span><span class="py">show</span><span class="o">()</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">spark</span><span class="o">.</span><span class="na">table</span><span class="o">(</span><span class="s">"src"</span><span class="o">).</span><span class="na">join</span><span class="o">(</span><span class="n">spark</span><span class="o">.</span><span class="na">table</span><span class="o">(</span><span class="s">"records"</span><span class="o">).</span><span class="na">hint</span><span class="o">(</span><span class="s">"broadcast"</span><span class="o">),</span> <span class="s">"key"</span><span class="o">).</span><span class="na">show</span><span class="o">();</span></code></pre></figure>
</div>
<div data-lang="python">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">spark</span><span class="p">.</span><span class="n">table</span><span class="p">(</span><span class="s">"src"</span><span class="p">).</span><span class="n">join</span><span class="p">(</span><span class="n">spark</span><span class="p">.</span><span class="n">table</span><span class="p">(</span><span class="s">"records"</span><span class="p">).</span><span class="n">hint</span><span class="p">(</span><span class="s">"broadcast"</span><span class="p">),</span> <span class="s">"key"</span><span class="p">).</span><span class="n">show</span><span class="p">()</span></code></pre></figure>
</div>
<div data-lang="r">
<figure class="highlight"><pre><code class="language-r" data-lang="r"><span class="n">src</span><span class="w"> </span><span class="o">&lt;-</span><span class="w"> </span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT * FROM src"</span><span class="p">)</span><span class="w">
</span><span class="n">records</span><span class="w"> </span><span class="o">&lt;-</span><span class="w"> </span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT * FROM records"</span><span class="p">)</span><span class="w">
</span><span class="n">head</span><span class="p">(</span><span class="n">join</span><span class="p">(</span><span class="n">src</span><span class="p">,</span><span class="w"> </span><span class="n">hint</span><span class="p">(</span><span class="n">records</span><span class="p">,</span><span class="w"> </span><span class="s2">"broadcast"</span><span class="p">),</span><span class="w"> </span><span class="n">src</span><span class="o">$</span><span class="n">key</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="n">records</span><span class="o">$</span><span class="n">key</span><span class="p">))</span></code></pre></figure>
</div>
<div data-lang="SQL">
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="c1">-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint</span>
<span class="k">SELECT</span> <span class="cm">/*+ BROADCAST(r) */</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">records</span> <span class="n">r</span> <span class="k">JOIN</span> <span class="n">src</span> <span class="n">s</span> <span class="k">ON</span> <span class="n">r</span><span class="p">.</span><span class="k">key</span> <span class="o">=</span> <span class="n">s</span><span class="p">.</span><span class="k">key</span></code></pre></figure>
</div>
</div>
<p>For more details please refer to the documentation of <a href="sql-ref-syntax-qry-select-hints.html#join-hints">Join Hints</a>.</p>
<h2 id="coalesce-hints-for-sql-queries">Coalesce Hints for SQL Queries</h2>
<p>Coalesce hints allows the Spark SQL users to control the number of output files just like the
<code class="language-plaintext highlighter-rouge">coalesce</code>, <code class="language-plaintext highlighter-rouge">repartition</code> and <code class="language-plaintext highlighter-rouge">repartitionByRange</code> in Dataset API, they can be used for performance
tuning and reducing the number of output files. The &#8220;COALESCE&#8221; hint only has a partition number as a
parameter. The &#8220;REPARTITION&#8221; hint has a partition number, columns, or both/neither of them as parameters.
The &#8220;REPARTITION_BY_RANGE&#8221; hint must have column names and a partition number is optional.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t
</code></pre></div></div>
<p>For more details please refer to the documentation of <a href="sql-ref-syntax-qry-select-hints.html#partitioning-hints">Partitioning Hints</a>.</p>
<h2 id="adaptive-query-execution">Adaptive Query Execution</h2>
<p>Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan, which is enabled by default since Apache Spark 3.2.0. Spark SQL can turn on and off AQE by <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.enabled</code> as an umbrella configuration. As of Spark 3.0, there are three major features in AQE: including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization.</p>
<h3 id="coalescing-post-shuffle-partitions">Coalescing Post Shuffle Partitions</h3>
<p>This feature coalesces the post shuffle partitions based on the map output statistics when both <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.enabled</code> and <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.coalescePartitions.enabled</code> configurations are true. This feature simplifies the tuning of shuffle partition number when running queries. You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.coalescePartitions.initialPartitionNum</code> configuration.</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.sql.adaptive.coalescePartitions.enabled</code></td>
<td>true</td>
<td>
When true and <code>spark.sql.adaptive.enabled</code> is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>), to avoid too many small tasks.
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.coalescePartitions.parallelismFirst</code></td>
<td>true</td>
<td>
When true, Spark ignores the target size specified by <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code> (default 64MB) when coalescing contiguous shuffle partitions, and only respect the minimum partition size specified by <code>spark.sql.adaptive.coalescePartitions.minPartitionSize</code> (default 1MB), to maximize the parallelism. This is to avoid performance regression when enabling adaptive query execution. It's recommended to set this config to false and respect the target size specified by <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.coalescePartitions.minPartitionSize</code></td>
<td>1MB</td>
<td>
The minimum size of shuffle partitions after coalescing. Its value can be at most 20% of <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>. This is useful when the target size is ignored during partition coalescing, which is the default case.
</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.coalescePartitions.initialPartitionNum</code></td>
<td>(none)</td>
<td>
The initial number of shuffle partitions before coalescing. If not set, it equals to <code>spark.sql.shuffle.partitions</code>. This configuration only has an effect when <code>spark.sql.adaptive.enabled</code> and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code></td>
<td>64 MB</td>
<td>
The advisory size in bytes of the shuffle partition during adaptive optimization (when <code>spark.sql.adaptive.enabled</code> is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
</td>
<td>3.0.0</td>
</tr>
</table>
<h3 id="converting-sort-merge-join-to-broadcast-join">Converting sort-merge join to broadcast join</h3>
<p>AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the adaptive broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it&#8217;s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.localShuffleReader.enabled</code> is true)</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.sql.adaptive.autoBroadcastJoinThreshold</code></td>
<td>(none)</td>
<td>
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with <code>spark.sql.autoBroadcastJoinThreshold</code>. Note that, this config is used only in adaptive framework.
</td>
<td>3.2.0</td>
</tr>
</table>
<h3 id="converting-sort-merge-join-to-shuffled-hash-join">Converting sort-merge join to shuffled hash join</h3>
<p>AQE converts sort-merge join to shuffled hash join when all post shuffle partitions are smaller than a threshold, the max threshold can see the config <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold</code>.</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold</code></td>
<td>0</td>
<td>
Configures the maximum size in bytes per partition that can be allowed to build local hash map. If this value is not smaller than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code> and all the partition size are not larger than this config, join selection prefer to use shuffled hash join instead of sort merge join regardless of the value of <code>spark.sql.join.preferSortMergeJoin</code>.
</td>
<td>3.2.0</td>
</tr>
</table>
<h3 id="optimizing-skew-join">Optimizing Skew Join</h3>
<p>Data skew can severely downgrade the performance of join queries. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.enabled</code> and <code class="language-plaintext highlighter-rouge">spark.sql.adaptive.skewJoin.enabled</code> configurations are enabled.</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.sql.adaptive.skewJoin.enabled</code></td>
<td>true</td>
<td>
When true and <code>spark.sql.adaptive.enabled</code> is true, Spark dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed partitions.
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
<td>5</td>
<td>
A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code>.
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
<td>256MB</td>
<td>
A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size. Ideally this config should be set larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
</td>
<td>3.0.0</td>
</tr>
</table>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.5.1.min.js"></script>
<script src="js/vendor/bootstrap.bundle.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'b18ca3732c502995563043aa17bc6ecb',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:3.2.0"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>