|  |  | 
|  | <!DOCTYPE html> | 
|  | <!--[if lt IE 7]>      <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> | 
|  | <!--[if IE 7]>         <html class="no-js lt-ie9 lt-ie8"> <![endif]--> | 
|  | <!--[if IE 8]>         <html class="no-js lt-ie9"> <![endif]--> | 
|  | <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> | 
|  | <head> | 
|  | <meta charset="utf-8"> | 
|  | <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> | 
|  | <meta name="viewport" content="width=device-width, initial-scale=1.0"> | 
|  |  | 
|  | <title>Integration with Cloud Infrastructures - Spark 3.5.0 Documentation</title> | 
|  |  | 
|  | <meta name="description" content="Introduction to cloud storage support in Apache Spark 3.5.0"> | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  | <link rel="stylesheet" href="css/bootstrap.min.css"> | 
|  | <link rel="preconnect" href="https://fonts.googleapis.com"> | 
|  | <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> | 
|  | <link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet"> | 
|  | <link href="css/custom.css" rel="stylesheet"> | 
|  | <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> | 
|  |  | 
|  | <link rel="stylesheet" href="css/pygments-default.css"> | 
|  | <link rel="stylesheet" href="css/docsearch.min.css" /> | 
|  | <link rel="stylesheet" href="css/docsearch.css"> | 
|  |  | 
|  | <!-- Matomo --> | 
|  | <script type="text/javascript"> | 
|  | var _paq = window._paq = window._paq || []; | 
|  | /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ | 
|  | _paq.push(["disableCookies"]); | 
|  | _paq.push(['trackPageView']); | 
|  | _paq.push(['enableLinkTracking']); | 
|  | (function() { | 
|  | var u="https://analytics.apache.org/"; | 
|  | _paq.push(['setTrackerUrl', u+'matomo.php']); | 
|  | _paq.push(['setSiteId', '40']); | 
|  | var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; | 
|  | g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); | 
|  | })(); | 
|  | </script> | 
|  | <!-- End Matomo Code --> | 
|  | </head> | 
|  | <body class="global"> | 
|  | <!--[if lt IE 7]> | 
|  | <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> | 
|  | <![endif]--> | 
|  |  | 
|  | <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> | 
|  |  | 
|  | <nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar"> | 
|  | <div class="navbar-brand"><a href="index.html"> | 
|  | <img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.0</span> | 
|  | </div> | 
|  | <button class="navbar-toggler" type="button" data-toggle="collapse" | 
|  | data-target="#navbarCollapse" aria-controls="navbarCollapse" | 
|  | aria-expanded="false" aria-label="Toggle navigation"> | 
|  | <span class="navbar-toggler-icon"></span> | 
|  | </button> | 
|  | <div class="collapse navbar-collapse" id="navbarCollapse"> | 
|  | <ul class="navbar-nav me-auto"> | 
|  | <li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li> | 
|  |  | 
|  | <li class="nav-item dropdown"> | 
|  | <a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a> | 
|  | <div class="dropdown-menu" aria-labelledby="navbarQuickStart"> | 
|  | <a class="dropdown-item" href="quick-start.html">Quick Start</a> | 
|  | <a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a> | 
|  | <a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a> | 
|  | <a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a> | 
|  | <a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a> | 
|  | <a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a> | 
|  | <a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a> | 
|  | <a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a> | 
|  | <a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a> | 
|  | </div> | 
|  | </li> | 
|  |  | 
|  | <li class="nav-item dropdown"> | 
|  | <a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a> | 
|  | <div class="dropdown-menu" aria-labelledby="navbarAPIDocs"> | 
|  | <a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a> | 
|  | <a class="dropdown-item" href="api/java/index.html">Java</a> | 
|  | <a class="dropdown-item" href="api/python/index.html">Python</a> | 
|  | <a class="dropdown-item" href="api/R/index.html">R</a> | 
|  | <a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a> | 
|  | </div> | 
|  | </li> | 
|  |  | 
|  | <li class="nav-item dropdown"> | 
|  | <a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a> | 
|  | <div class="dropdown-menu" aria-labelledby="navbarDeploying"> | 
|  | <a class="dropdown-item" href="cluster-overview.html">Overview</a> | 
|  | <a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a> | 
|  | <div class="dropdown-divider"></div> | 
|  | <a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a> | 
|  | <a class="dropdown-item" href="running-on-mesos.html">Mesos</a> | 
|  | <a class="dropdown-item" href="running-on-yarn.html">YARN</a> | 
|  | <a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a> | 
|  | </div> | 
|  | </li> | 
|  |  | 
|  | <li class="nav-item dropdown"> | 
|  | <a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a> | 
|  | <div class="dropdown-menu" aria-labelledby="navbarMore"> | 
|  | <a class="dropdown-item" href="configuration.html">Configuration</a> | 
|  | <a class="dropdown-item" href="monitoring.html">Monitoring</a> | 
|  | <a class="dropdown-item" href="tuning.html">Tuning Guide</a> | 
|  | <a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a> | 
|  | <a class="dropdown-item" href="security.html">Security</a> | 
|  | <a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a> | 
|  | <a class="dropdown-item" href="migration-guide.html">Migration Guide</a> | 
|  | <div class="dropdown-divider"></div> | 
|  | <a class="dropdown-item" href="building-spark.html">Building Spark</a> | 
|  | <a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a> | 
|  | <a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a> | 
|  | </div> | 
|  | </li> | 
|  |  | 
|  | <li class="nav-item"> | 
|  | <input type="text" id="docsearch-input" placeholder="Search the docs…"> | 
|  | </li> | 
|  | </ul> | 
|  | <!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.0</span></span>--> | 
|  | </div> | 
|  | </nav> | 
|  |  | 
|  |  | 
|  |  | 
|  | <div class="container"> | 
|  |  | 
|  |  | 
|  | <div class="content mr-3" id="content"> | 
|  |  | 
|  |  | 
|  | <h1 class="title">Integration with Cloud Infrastructures</h1> | 
|  |  | 
|  |  | 
|  | <ul id="markdown-toc"> | 
|  | <li><a href="#introduction" id="markdown-toc-introduction">Introduction</a>    <ul> | 
|  | <li><a href="#important-cloud-object-stores-are-not-real-filesystems" id="markdown-toc-important-cloud-object-stores-are-not-real-filesystems">Important: Cloud Object Stores are Not Real Filesystems</a></li> | 
|  | <li><a href="#consistency" id="markdown-toc-consistency">Consistency</a></li> | 
|  | <li><a href="#installation" id="markdown-toc-installation">Installation</a></li> | 
|  | <li><a href="#authenticating" id="markdown-toc-authenticating">Authenticating</a></li> | 
|  | </ul> | 
|  | </li> | 
|  | <li><a href="#configuring" id="markdown-toc-configuring">Configuring</a>    <ul> | 
|  | <li><a href="#recommended-settings-for-writing-to-object-stores" id="markdown-toc-recommended-settings-for-writing-to-object-stores">Recommended settings for writing to object stores</a></li> | 
|  | <li><a href="#parquet-io-settings" id="markdown-toc-parquet-io-settings">Parquet I/O Settings</a></li> | 
|  | <li><a href="#orc-io-settings" id="markdown-toc-orc-io-settings">ORC I/O Settings</a></li> | 
|  | </ul> | 
|  | </li> | 
|  | <li><a href="#spark-streaming-and-object-storage" id="markdown-toc-spark-streaming-and-object-storage">Spark Streaming and Object Storage</a></li> | 
|  | <li><a href="#committing-work-into-cloud-storage-safely-and-fast" id="markdown-toc-committing-work-into-cloud-storage-safely-and-fast">Committing work into cloud storage safely and fast.</a>    <ul> | 
|  | <li><a href="#hadoop-s3a-committers" id="markdown-toc-hadoop-s3a-committers">Hadoop S3A committers</a></li> | 
|  | <li><a href="#amazon-emr-the-emrfs-s3-optimized-committer" id="markdown-toc-amazon-emr-the-emrfs-s3-optimized-committer">Amazon EMR: the EMRFS S3-optimized committer</a></li> | 
|  | <li><a href="#azure-and-google-cloud-storage-mapreduce-intermediate-manifest-committer" id="markdown-toc-azure-and-google-cloud-storage-mapreduce-intermediate-manifest-committer">Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.</a></li> | 
|  | <li><a href="#ibm-cloud-object-storage-stocator" id="markdown-toc-ibm-cloud-object-storage-stocator">IBM Cloud Object Storage: Stocator</a></li> | 
|  | </ul> | 
|  | </li> | 
|  | <li><a href="#cloud-committers-and-insert-overwrite-table" id="markdown-toc-cloud-committers-and-insert-overwrite-table">Cloud Committers and <code class="language-plaintext highlighter-rouge">INSERT OVERWRITE TABLE</code></a></li> | 
|  | <li><a href="#further-reading" id="markdown-toc-further-reading">Further Reading</a></li> | 
|  | </ul> | 
|  |  | 
|  | <h2 id="introduction">Introduction</h2> | 
|  |  | 
|  | <p>All major cloud providers offer persistent data storage in <em>object stores</em>. | 
|  | These are not classic “POSIX” file systems. | 
|  | In order to store hundreds of petabytes of data without any single points of failure, | 
|  | object stores replace the classic file system directory tree | 
|  | with a simpler model of <code class="language-plaintext highlighter-rouge">object-name => data</code>. To enable remote access, operations | 
|  | on objects are usually offered as (slow) HTTP REST operations.</p> | 
|  |  | 
|  | <p>Spark can read and write data in object stores through filesystem connectors implemented | 
|  | in Hadoop or provided by the infrastructure suppliers themselves. | 
|  | These connectors make the object stores look <em>almost</em> like file systems, with directories and files | 
|  | and the classic operations on them such as list, delete and rename.</p> | 
|  |  | 
|  | <h3 id="important-cloud-object-stores-are-not-real-filesystems">Important: Cloud Object Stores are Not Real Filesystems</h3> | 
|  |  | 
|  | <p>While the stores appear to be filesystems, underneath | 
|  | they are still object stores, <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html">and the difference is significant</a></p> | 
|  |  | 
|  | <p>They cannot be used as a direct replacement for a cluster filesystem such as HDFS | 
|  | <em>except where this is explicitly stated</em>.</p> | 
|  |  | 
|  | <p>Key differences are:</p> | 
|  |  | 
|  | <ul> | 
|  | <li>The means by which directories are emulated may make working with them slow.</li> | 
|  | <li>Rename operations may be very slow and, on failure, leave the store in an unknown state.</li> | 
|  | <li>Seeking within a file may require new HTTP calls, hurting performance.</li> | 
|  | </ul> | 
|  |  | 
|  | <p>How does this affect Spark?</p> | 
|  |  | 
|  | <ol> | 
|  | <li>Reading and writing data can be significantly slower than working with a normal filesystem.</li> | 
|  | <li>Some directory structures may be very inefficient to scan during query split calculation.</li> | 
|  | <li>The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset | 
|  | is potentially both slow and unreliable.</li> | 
|  | </ol> | 
|  |  | 
|  | <p>For these reasons, it is not always safe to use an object store as a direct destination of queries, or as | 
|  | an intermediate store in a chain of queries. Consult the documentation of the object store and its | 
|  | connector to determine which uses are considered safe.</p> | 
|  |  | 
|  | <h3 id="consistency">Consistency</h3> | 
|  |  | 
|  | <p>As of 2021, the object stores of Amazon (S3), Google Cloud (GCS) and Microsoft (Azure Storage, ADLS Gen1, ADLS Gen2) are all <em>consistent</em>.</p> | 
|  |  | 
|  | <p>This means that as soon as a file is written/updated it can be listed, viewed and opened by other processes | 
|  | -and the latest version will be retrieved. This was a known issue with AWS S3, especially with 404 caching | 
|  | of HEAD requests made before an object was created.</p> | 
|  |  | 
|  | <p>Even so: none of the store connectors provide any guarantees as to how their clients cope with objects | 
|  | which are overwritten while a stream is reading them. Do not assume that the old file can be safely | 
|  | read, nor that there is any bounded time period for changes to become visible -or indeed, that | 
|  | the clients will not simply fail if a file being read is overwritten.</p> | 
|  |  | 
|  | <p>For this reason: avoid overwriting files where it is known/likely that other clients | 
|  | will be actively reading them.</p> | 
|  |  | 
|  | <p>Other object stores are <em>inconsistent</em></p> | 
|  |  | 
|  | <p>This includes <a href="https://docs.openstack.org/swift/latest/">OpenStack Swift</a>.</p> | 
|  |  | 
|  | <p>Such stores are not always safe to use as a destination of work -consult | 
|  | each store’s specific documentation.</p> | 
|  |  | 
|  | <h3 id="installation">Installation</h3> | 
|  |  | 
|  | <p>With the relevant libraries on the classpath and Spark configured with valid credentials, | 
|  | objects can be read or written by using their URLs as the path to data. | 
|  | For example <code class="language-plaintext highlighter-rouge">sparkContext.textFile("s3a://landsat-pds/scene_list.gz")</code> will create | 
|  | an RDD of the file <code class="language-plaintext highlighter-rouge">scene_list.gz</code> stored in S3, using the s3a connector.</p> | 
|  |  | 
|  | <p>To add the relevant libraries to an application’s classpath, include the <code class="language-plaintext highlighter-rouge">hadoop-cloud</code> | 
|  | module and its dependencies.</p> | 
|  |  | 
|  | <p>In Maven, add the following to the <code class="language-plaintext highlighter-rouge">pom.xml</code> file, assuming <code class="language-plaintext highlighter-rouge">spark.version</code> | 
|  | is set to the chosen version of Spark:</p> | 
|  |  | 
|  | <figure class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><dependencyManagement></span> | 
|  | ... | 
|  | <span class="nt"><dependency></span> | 
|  | <span class="nt"><groupId></span>org.apache.spark<span class="nt"></groupId></span> | 
|  | <span class="nt"><artifactId></span>spark-hadoop-cloud_2.12<span class="nt"></artifactId></span> | 
|  | <span class="nt"><version></span>${spark.version}<span class="nt"></version></span> | 
|  | <span class="nt"><scope></span>provided<span class="nt"></scope></span> | 
|  | <span class="nt"></dependency></span> | 
|  | ... | 
|  | <span class="nt"></dependencyManagement></span></code></pre></figure> | 
|  |  | 
|  | <p>Commercial products based on Apache Spark generally directly set up the classpath | 
|  | for talking to cloud infrastructures, in which case this module may not be needed.</p> | 
|  |  | 
|  | <h3 id="authenticating">Authenticating</h3> | 
|  |  | 
|  | <p>Spark jobs must authenticate with the object stores to access data within them.</p> | 
|  |  | 
|  | <ol> | 
|  | <li>When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.</li> | 
|  | <li><code class="language-plaintext highlighter-rouge">spark-submit</code> reads the <code class="language-plaintext highlighter-rouge">AWS_ACCESS_KEY_ID</code>, <code class="language-plaintext highlighter-rouge">AWS_SECRET_ACCESS_KEY</code> | 
|  | and <code class="language-plaintext highlighter-rouge">AWS_SESSION_TOKEN</code> environment variables and sets the associated authentication options | 
|  | for the <code class="language-plaintext highlighter-rouge">s3n</code> and <code class="language-plaintext highlighter-rouge">s3a</code> connectors to Amazon S3.</li> | 
|  | <li>In a Hadoop cluster, settings may be set in the <code class="language-plaintext highlighter-rouge">core-site.xml</code> file.</li> | 
|  | <li>Authentication details may be manually added to the Spark configuration in <code class="language-plaintext highlighter-rouge">spark-defaults.conf</code></li> | 
|  | <li>Alternatively, they can be programmatically set in the <code class="language-plaintext highlighter-rouge">SparkConf</code> instance used to configure | 
|  | the application’s <code class="language-plaintext highlighter-rouge">SparkContext</code>.</li> | 
|  | </ol> | 
|  |  | 
|  | <p><em>Important: never check authentication secrets into source code repositories, | 
|  | especially public ones</em></p> | 
|  |  | 
|  | <p>Consult <a href="https://hadoop.apache.org/docs/current/">the Hadoop documentation</a> for the relevant | 
|  | configuration and security options.</p> | 
|  |  | 
|  | <h2 id="configuring">Configuring</h2> | 
|  |  | 
|  | <p>Each cloud connector has its own set of configuration parameters, again, | 
|  | consult the relevant documentation.</p> | 
|  |  | 
|  | <h3 id="recommended-settings-for-writing-to-object-stores">Recommended settings for writing to object stores</h3> | 
|  |  | 
|  | <p>For object stores whose consistency model means that rename-based commits are safe | 
|  | use the <code class="language-plaintext highlighter-rouge">FileOutputCommitter</code> v2 algorithm for performance; v1 for safety.</p> | 
|  |  | 
|  | <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>This does less renaming at the end of a job than the “version 1” algorithm. | 
|  | As it still uses <code class="language-plaintext highlighter-rouge">rename()</code> to commit files, it is unsafe to use | 
|  | when the object store does not have consistent metadata/listings.</p> | 
|  |  | 
|  | <p>The committer can also be set to ignore failures when cleaning up temporary | 
|  | files; this reduces the risk that a transient network problem is escalated into a | 
|  | job failure:</p> | 
|  |  | 
|  | <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>The original v1 commit algorithm renames the output of successful tasks | 
|  | to a job attempt directory, and then renames all the files in that directory | 
|  | into the final destination during the job commit phase:</p> | 
|  |  | 
|  | <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>The slow performance of mimicked renames on Amazon S3 makes this algorithm | 
|  | very, very slow. The recommended solution to this is switch to an S3 “Zero Rename” | 
|  | committer (see below).</p> | 
|  |  | 
|  | <p>For reference, here are the performance and safety characteristics of | 
|  | different stores and connectors when renaming directories:</p> | 
|  |  | 
|  | <table> | 
|  | <thead> | 
|  | <tr> | 
|  | <th>Store</th> | 
|  | <th>Connector</th> | 
|  | <th>Directory Rename Safety</th> | 
|  | <th>Rename Performance</th> | 
|  | </tr> | 
|  | </thead> | 
|  | <tbody> | 
|  | <tr> | 
|  | <td>Amazon S3</td> | 
|  | <td>s3a</td> | 
|  | <td>Unsafe</td> | 
|  | <td>O(data)</td> | 
|  | </tr> | 
|  | <tr> | 
|  | <td>Azure Storage</td> | 
|  | <td>wasb</td> | 
|  | <td>Safe</td> | 
|  | <td>O(files)</td> | 
|  | </tr> | 
|  | <tr> | 
|  | <td>Azure Datalake Gen 2</td> | 
|  | <td>abfs</td> | 
|  | <td>Safe</td> | 
|  | <td>O(1)</td> | 
|  | </tr> | 
|  | <tr> | 
|  | <td>Google Cloud Storage</td> | 
|  | <td>gs</td> | 
|  | <td>Mixed</td> | 
|  | <td>O(files)</td> | 
|  | </tr> | 
|  | </tbody> | 
|  | </table> | 
|  |  | 
|  | <ol> | 
|  | <li>As storing temporary files can run up charges; delete | 
|  | directories called <code class="language-plaintext highlighter-rouge">"_temporary"</code> on a regular basis.</li> | 
|  | <li>For AWS S3, set a limit on how long multipart uploads can remain outstanding. | 
|  | This avoids incurring bills from incompleted uploads.</li> | 
|  | <li>For Google cloud, directory rename is file-by-file. Consider using the v2 committer | 
|  | and only write code which generates idempotent output -including filenames, | 
|  | as it is <em>no more unsafe</em> than the v1 committer, and faster.</li> | 
|  | </ol> | 
|  |  | 
|  | <h3 id="parquet-io-settings">Parquet I/O Settings</h3> | 
|  |  | 
|  | <p>For optimal performance when working with Parquet data use the following settings:</p> | 
|  |  | 
|  | <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.hadoop.parquet.enable.summary-metadata false | 
|  | spark.sql.parquet.mergeSchema false | 
|  | spark.sql.parquet.filterPushdown true | 
|  | spark.sql.hive.metastorePartitionPruning true | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>These minimise the amount of data read during queries.</p> | 
|  |  | 
|  | <h3 id="orc-io-settings">ORC I/O Settings</h3> | 
|  |  | 
|  | <p>For best performance when working with ORC data, use these settings:</p> | 
|  |  | 
|  | <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql.orc.filterPushdown true | 
|  | spark.sql.orc.splits.include.file.footer true | 
|  | spark.sql.orc.cache.stripe.details.size 10000 | 
|  | spark.sql.hive.metastorePartitionPruning true | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>Again, these minimise the amount of data read during queries.</p> | 
|  |  | 
|  | <h2 id="spark-streaming-and-object-storage">Spark Streaming and Object Storage</h2> | 
|  |  | 
|  | <p>Spark Streaming can monitor files added to object stores, by | 
|  | creating a <code class="language-plaintext highlighter-rouge">FileInputDStream</code> to monitor a path in the store through a call to | 
|  | <code class="language-plaintext highlighter-rouge">StreamingContext.textFileStream()</code>.</p> | 
|  |  | 
|  | <ol> | 
|  | <li> | 
|  | <p>The time to scan for new files is proportional to the number of files | 
|  | under the path, not the number of <em>new</em> files, so it can become a slow operation. | 
|  | The size of the window needs to be set to handle this.</p> | 
|  | </li> | 
|  | <li> | 
|  | <p>Files only appear in an object store once they are completely written; there | 
|  | is no need for a workflow of write-then-rename to ensure that files aren’t picked up | 
|  | while they are still being written. Applications can write straight to the monitored directory.</p> | 
|  | </li> | 
|  | <li> | 
|  | <p>In case of the default checkpoint file manager called <code class="language-plaintext highlighter-rouge">FileContextBasedCheckpointFileManager</code> | 
|  | streams should only be checkpointed to a store implementing a fast and | 
|  | atomic <code class="language-plaintext highlighter-rouge">rename()</code> operation. Otherwise the checkpointing may be slow and potentially unreliable. | 
|  | On AWS S3 with Hadoop 3.3.1 or later using the S3A connector the abortable stream based checkpoint | 
|  | file manager can be used (by setting the <code class="language-plaintext highlighter-rouge">spark.sql.streaming.checkpointFileManagerClass</code> | 
|  | configuration to <code class="language-plaintext highlighter-rouge">org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager</code>) | 
|  | which eliminates the slow rename. In this case users must be extra careful to avoid the reuse of | 
|  | the checkpoint location among multiple queries running parallelly as that could lead to corruption | 
|  | of the checkpointing data.</p> | 
|  | </li> | 
|  | </ol> | 
|  |  | 
|  | <h2 id="committing-work-into-cloud-storage-safely-and-fast">Committing work into cloud storage safely and fast.</h2> | 
|  |  | 
|  | <p>As covered earlier, commit-by-rename is dangerous on any object store which | 
|  | exhibits eventual consistency (example: S3), and often slower than classic | 
|  | filesystem renames.</p> | 
|  |  | 
|  | <p>Some object store connectors provide custom committers to commit tasks and | 
|  | jobs without using rename.</p> | 
|  |  | 
|  | <h3 id="hadoop-s3a-committers">Hadoop S3A committers</h3> | 
|  |  | 
|  | <p>In versions of Spark built with Hadoop 3.1 or later, | 
|  | the hadoop-aws JAR contains committers safe to use for S3 storage | 
|  | accessed via the s3a connector.</p> | 
|  |  | 
|  | <p>Instead of writing data to a temporary directory on the store for renaming, | 
|  | these committers write the files to the final destination, but do not issue | 
|  | the final POST command to make a large “multi-part” upload visible. Those | 
|  | operations are postponed until the job commit itself. As a result, task and | 
|  | job commit are much faster, and task failures do not affect the result.</p> | 
|  |  | 
|  | <p>To switch to the S3A committers, use a version of Spark was built with Hadoop | 
|  | 3.1 or later, and switch the committers through the following options.</p> | 
|  |  | 
|  | <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.hadoop.fs.s3a.committer.name directory | 
|  | spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol | 
|  | spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>It has been tested with the most common formats supported by Spark.</p> | 
|  |  | 
|  | <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">mydataframe</span><span class="p">.</span><span class="n">write</span><span class="p">.</span><span class="nb">format</span><span class="p">(</span><span class="s">"parquet"</span><span class="p">).</span><span class="n">save</span><span class="p">(</span><span class="s">"s3a://bucket/destination"</span><span class="p">)</span> | 
|  | </code></pre></div></div> | 
|  |  | 
|  | <p>More details on these committers can be found in | 
|  | <a href="https://hadoop.apache.org/docs/current/">the latest Hadoop documentation</a> | 
|  | with S3A committer detail covered in | 
|  | <a href="https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html">Committing work to S3 with the S3A Committers</a>.</p> | 
|  |  | 
|  | <p>Note: depending upon the committer used, in-progress statistics may be | 
|  | under-reported with Hadoop versions before 3.3.1.</p> | 
|  |  | 
|  | <h3 id="amazon-emr-the-emrfs-s3-optimized-committer">Amazon EMR: the EMRFS S3-optimized committer</h3> | 
|  |  | 
|  | <p>Amazon EMR has its own S3-aware committers for parquet data. | 
|  | For instructions on use, see | 
|  | <a href="https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html">the EMRFS S3-optimized committer</a></p> | 
|  |  | 
|  | <p>For implementation and performanc details, see | 
|  | [“Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/</p> | 
|  |  | 
|  | <h3 id="azure-and-google-cloud-storage-mapreduce-intermediate-manifest-committer">Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.</h3> | 
|  |  | 
|  | <p>Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later) | 
|  | contain a committer optimized for performance and resilience on | 
|  | Azure ADLS Generation 2 and Google Cloud Storage.</p> | 
|  |  | 
|  | <p>This committer, the “manifest committer” uses a manifest file to propagate | 
|  | directory listing information from the task committers to the job committer. | 
|  | These manifests can be written atomically, without relying on atomic directory rename, | 
|  | something GCS lacks.</p> | 
|  |  | 
|  | <p>The job committer reads these manifests and will rename files from the task output | 
|  | directories directly into the destination directory, in parallel, with optional | 
|  | rate limiting to avoid throttling IO. | 
|  | This delivers performance and scalability on the object stores.</p> | 
|  |  | 
|  | <p>It is not critical for job correctness to use this with Azure storage; the | 
|  | classic FileOutputCommitter is safe there -however this new committer scales | 
|  | better for large jobs with deep and wide directory trees.</p> | 
|  |  | 
|  | <p>Because Google GCS does not support atomic directory renaming, | 
|  | the manifest committer should be used where available.</p> | 
|  |  | 
|  | <p>This committer does support  “dynamic partition overwrite” (see below).</p> | 
|  |  | 
|  | <p>For details on availability and use of this committer, consult | 
|  | the hadoop documentation for the Hadoop release used.</p> | 
|  |  | 
|  | <p>It is not available on Hadoop 3.3.4 or earlier.</p> | 
|  |  | 
|  | <h3 id="ibm-cloud-object-storage-stocator">IBM Cloud Object Storage: Stocator</h3> | 
|  |  | 
|  | <p>IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift.</p> | 
|  |  | 
|  | <p>Source, documentation and releasea can be found at | 
|  | <a href="Stocator - Storage Connector for Apache Spark">https://github.com/CODAIT/stocator</a>.</p> | 
|  |  | 
|  | <h2 id="cloud-committers-and-insert-overwrite-table">Cloud Committers and <code class="language-plaintext highlighter-rouge">INSERT OVERWRITE TABLE</code></h2> | 
|  |  | 
|  | <p>Spark has a feature called “dynamic partition overwrite”; a table can be updated and only those | 
|  | partitions into which new data is added will have their contents replaced.</p> | 
|  |  | 
|  | <p>This is used in SQL statements of the form <code class="language-plaintext highlighter-rouge">INSERT OVERWRITE TABLE</code>, | 
|  | and when Datasets are written in mode “overwrite”</p> | 
|  |  | 
|  | <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">eventDataset</span><span class="o">.</span><span class="py">write</span> | 
|  | <span class="o">.</span><span class="py">mode</span><span class="o">(</span><span class="s">"overwrite"</span><span class="o">)</span> | 
|  | <span class="o">.</span><span class="py">partitionBy</span><span class="o">(</span><span class="s">"year"</span><span class="o">,</span> <span class="s">"month"</span><span class="o">)</span> | 
|  | <span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"parquet"</span><span class="o">)</span> | 
|  | <span class="o">.</span><span class="py">save</span><span class="o">(</span><span class="n">tablePath</span><span class="o">)</span></code></pre></figure> | 
|  |  | 
|  | <p>This feature uses file renaming and has specific requirements of | 
|  | both the committer and the filesystem:</p> | 
|  |  | 
|  | <ol> | 
|  | <li>The committer’s working directory must be in the destination filesystem.</li> | 
|  | <li>The target filesystem must support file rename efficiently.</li> | 
|  | </ol> | 
|  |  | 
|  | <p>These conditions are <em>not</em> met by the S3A committers and AWS S3 storage.</p> | 
|  |  | 
|  | <p>Committers for other cloud stores <em>may</em> support this feature, and | 
|  | declare to spark that they are compatible. If dynamic partition overwrite | 
|  | is required when writing data through a hadoop committer, Spark | 
|  | will always permit this when the original <code class="language-plaintext highlighter-rouge">FileOutputCommitter</code> | 
|  | is used. For other committers, after their instantiation, Spark | 
|  | will probe for their declaration of compatibility, and | 
|  | permit the operation if state that they are compatible.</p> | 
|  |  | 
|  | <p>If the committer is not compatible, the operation will fail with | 
|  | the error message | 
|  | <code class="language-plaintext highlighter-rouge">PathOutputCommitter does not support dynamicPartitionOverwrite</code></p> | 
|  |  | 
|  | <p>Unless there is a compatible committer for the target filesystem, | 
|  | the sole solution is to use a cloud-friendly format for data | 
|  | storage.</p> | 
|  |  | 
|  | <h2 id="further-reading">Further Reading</h2> | 
|  |  | 
|  | <p>Here is the documentation on the standard connectors both from Apache and the cloud providers.</p> | 
|  |  | 
|  | <ul> | 
|  | <li><a href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">Azure Blob Storage</a>.</li> | 
|  | <li><a href="https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html">Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2</a>.</li> | 
|  | <li><a href="https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html">Azure Data Lake Gen 1</a>.</li> | 
|  | <li><a href="https://aws.amazon.com/s3/consistency/">Amazon S3 Strong Consistency</a></li> | 
|  | <li><a href="https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html">Hadoop-AWS module (Hadoop 3.x)</a>.</li> | 
|  | <li><a href="https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html">Amazon EMR File System (EMRFS)</a>. From Amazon.</li> | 
|  | <li><a href="https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html">Using the EMRFS S3-optimized Committer</a></li> | 
|  | <li><a href="https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage">Google Cloud Storage Connector for Spark and Hadoop</a>. From Google.</li> | 
|  | <li><a href="https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver">The Azure Blob Filesystem driver (ABFS)</a></li> | 
|  | <li>IBM Cloud Object Storage connector for Apache Spark: <a href="https://github.com/CODAIT/stocator">Stocator</a>, | 
|  | <a href="https://www.ibm.com/cloud/object-storage">IBM Object Storage</a>. From IBM.</li> | 
|  | <li><a href="https://github.com/aliyun/alibabacloud-jindofs">Using JindoFS SDK to access Alibaba Cloud OSS</a>.</li> | 
|  | </ul> | 
|  |  | 
|  | <p>The Cloud Committer problem and hive-compatible solutions</p> | 
|  | <ul> | 
|  | <li><a href="https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html">Committing work to S3 with the S3A Committers</a></li> | 
|  | <li><a href="https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/">Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer</a></li> | 
|  | <li><a href="https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md">The Manifest Committer for Azure and Google Cloud Storage</a></li> | 
|  | <li><a href="https://github.com/steveloughran/zero-rename-committer/releases/">A Zero-rename committer</a>.</li> | 
|  | <li><a href="http://arxiv.org/abs/1709.01812">Stocator: A High Performance Object Store Connector for Spark</a></li> | 
|  | </ul> | 
|  |  | 
|  |  | 
|  | </div> | 
|  |  | 
|  | <!-- /container --> | 
|  | </div> | 
|  |  | 
|  | <script src="js/vendor/jquery-3.5.1.min.js"></script> | 
|  | <script src="js/vendor/bootstrap.bundle.min.js"></script> | 
|  |  | 
|  | <script src="js/vendor/anchor.min.js"></script> | 
|  | <script src="js/main.js"></script> | 
|  |  | 
|  | <script type="text/javascript" src="js/vendor/docsearch.min.js"></script> | 
|  | <script type="text/javascript"> | 
|  | // DocSearch is entirely free and automated. DocSearch is built in two parts: | 
|  | // 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link | 
|  | //    in your website and extract content from every page it traverses. It then pushes this | 
|  | //    content to an Algolia index. | 
|  | // 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index | 
|  | //    to your search input and display its results in a dropdown UI. If you want to find more | 
|  | //    details on how works DocSearch, check the docs of DocSearch. | 
|  | docsearch({ | 
|  | apiKey: 'd62f962a82bc9abb53471cb7b89da35e', | 
|  | appId: 'RAI69RXRSK', | 
|  | indexName: 'apache_spark', | 
|  | inputSelector: '#docsearch-input', | 
|  | enhancedSearchInput: true, | 
|  | algoliaOptions: { | 
|  | 'facetFilters': ["version:3.5.0"] | 
|  | }, | 
|  | debug: false // Set debug to true if you want to inspect the dropdown | 
|  | }); | 
|  |  | 
|  | </script> | 
|  |  | 
|  | <!-- MathJax Section --> | 
|  | <script type="text/x-mathjax-config"> | 
|  | MathJax.Hub.Config({ | 
|  | TeX: { equationNumbers: { autoNumber: "AMS" } } | 
|  | }); | 
|  | </script> | 
|  | <script> | 
|  | // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. | 
|  | // We could use "//cdn.mathjax...", but that won't support "file://". | 
|  | (function(d, script) { | 
|  | script = d.createElement('script'); | 
|  | script.type = 'text/javascript'; | 
|  | script.async = true; | 
|  | script.onload = function(){ | 
|  | MathJax.Hub.Config({ | 
|  | tex2jax: { | 
|  | inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], | 
|  | displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], | 
|  | processEscapes: true, | 
|  | skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] | 
|  | } | 
|  | }); | 
|  | }; | 
|  | script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + | 
|  | 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + | 
|  | '?config=TeX-AMS-MML_HTMLorMML'; | 
|  | d.getElementsByTagName('head')[0].appendChild(script); | 
|  | }(document)); | 
|  | </script> | 
|  | </body> | 
|  | </html> |