| |
| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>RDD Programming Guide - Spark 3.5.0 Documentation</title> |
| |
| <meta name="description" content="Spark 3.5.0 programming guide in Java, Scala and Python"> |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> |
| <link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet"> |
| <link href="css/custom.css" rel="stylesheet"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <link rel="stylesheet" href="css/docsearch.min.css" /> |
| <link rel="stylesheet" href="css/docsearch.css"> |
| |
| <!-- Matomo --> |
| <script type="text/javascript"> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| _paq.push(["disableCookies"]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '40']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| </head> |
| <body class="global"> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar"> |
| <div class="navbar-brand"><a href="index.html"> |
| <img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.0</span> |
| </div> |
| <button class="navbar-toggler" type="button" data-toggle="collapse" |
| data-target="#navbarCollapse" aria-controls="navbarCollapse" |
| aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarCollapse"> |
| <ul class="navbar-nav me-auto"> |
| <li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a> |
| <div class="dropdown-menu" aria-labelledby="navbarQuickStart"> |
| <a class="dropdown-item" href="quick-start.html">Quick Start</a> |
| <a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a> |
| <a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a> |
| <a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a> |
| <a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a> |
| <a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a> |
| <a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a> |
| <a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a> |
| <a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a> |
| <div class="dropdown-menu" aria-labelledby="navbarAPIDocs"> |
| <a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a> |
| <a class="dropdown-item" href="api/java/index.html">Java</a> |
| <a class="dropdown-item" href="api/python/index.html">Python</a> |
| <a class="dropdown-item" href="api/R/index.html">R</a> |
| <a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a> |
| <div class="dropdown-menu" aria-labelledby="navbarDeploying"> |
| <a class="dropdown-item" href="cluster-overview.html">Overview</a> |
| <a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a> |
| <a class="dropdown-item" href="running-on-mesos.html">Mesos</a> |
| <a class="dropdown-item" href="running-on-yarn.html">YARN</a> |
| <a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a> |
| <div class="dropdown-menu" aria-labelledby="navbarMore"> |
| <a class="dropdown-item" href="configuration.html">Configuration</a> |
| <a class="dropdown-item" href="monitoring.html">Monitoring</a> |
| <a class="dropdown-item" href="tuning.html">Tuning Guide</a> |
| <a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a> |
| <a class="dropdown-item" href="security.html">Security</a> |
| <a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a> |
| <a class="dropdown-item" href="migration-guide.html">Migration Guide</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="building-spark.html">Building Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a> |
| </div> |
| </li> |
| |
| <li class="nav-item"> |
| <input type="text" id="docsearch-input" placeholder="Search the docs…"> |
| </li> |
| </ul> |
| <!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.0</span></span>--> |
| </div> |
| </nav> |
| |
| |
| |
| <div class="container"> |
| |
| |
| <div class="content mr-3" id="content"> |
| |
| |
| <h1 class="title">RDD Programming Guide</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#overview" id="markdown-toc-overview">Overview</a></li> |
| <li><a href="#linking-with-spark" id="markdown-toc-linking-with-spark">Linking with Spark</a></li> |
| <li><a href="#initializing-spark" id="markdown-toc-initializing-spark">Initializing Spark</a> <ul> |
| <li><a href="#using-the-shell" id="markdown-toc-using-the-shell">Using the Shell</a></li> |
| </ul> |
| </li> |
| <li><a href="#resilient-distributed-datasets-rdds" id="markdown-toc-resilient-distributed-datasets-rdds">Resilient Distributed Datasets (RDDs)</a> <ul> |
| <li><a href="#parallelized-collections" id="markdown-toc-parallelized-collections">Parallelized Collections</a></li> |
| <li><a href="#external-datasets" id="markdown-toc-external-datasets">External Datasets</a></li> |
| <li><a href="#rdd-operations" id="markdown-toc-rdd-operations">RDD Operations</a> <ul> |
| <li><a href="#basics" id="markdown-toc-basics">Basics</a></li> |
| <li><a href="#passing-functions-to-spark" id="markdown-toc-passing-functions-to-spark">Passing Functions to Spark</a></li> |
| <li><a href="#understanding-closures-" id="markdown-toc-understanding-closures-">Understanding closures <a name="ClosuresLink"></a></a> <ul> |
| <li><a href="#example" id="markdown-toc-example">Example</a></li> |
| <li><a href="#local-vs-cluster-modes" id="markdown-toc-local-vs-cluster-modes">Local vs. cluster modes</a></li> |
| <li><a href="#printing-elements-of-an-rdd" id="markdown-toc-printing-elements-of-an-rdd">Printing elements of an RDD</a></li> |
| </ul> |
| </li> |
| <li><a href="#working-with-key-value-pairs" id="markdown-toc-working-with-key-value-pairs">Working with Key-Value Pairs</a></li> |
| <li><a href="#transformations" id="markdown-toc-transformations">Transformations</a></li> |
| <li><a href="#actions" id="markdown-toc-actions">Actions</a></li> |
| <li><a href="#shuffle-operations" id="markdown-toc-shuffle-operations">Shuffle operations</a> <ul> |
| <li><a href="#background" id="markdown-toc-background">Background</a></li> |
| <li><a href="#performance-impact" id="markdown-toc-performance-impact">Performance Impact</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#rdd-persistence" id="markdown-toc-rdd-persistence">RDD Persistence</a> <ul> |
| <li><a href="#which-storage-level-to-choose" id="markdown-toc-which-storage-level-to-choose">Which Storage Level to Choose?</a></li> |
| <li><a href="#removing-data" id="markdown-toc-removing-data">Removing Data</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#shared-variables" id="markdown-toc-shared-variables">Shared Variables</a> <ul> |
| <li><a href="#broadcast-variables" id="markdown-toc-broadcast-variables">Broadcast Variables</a></li> |
| <li><a href="#accumulators" id="markdown-toc-accumulators">Accumulators</a></li> |
| </ul> |
| </li> |
| <li><a href="#deploying-to-a-cluster" id="markdown-toc-deploying-to-a-cluster">Deploying to a Cluster</a></li> |
| <li><a href="#launching-spark-jobs-from-java--scala" id="markdown-toc-launching-spark-jobs-from-java--scala">Launching Spark jobs from Java / Scala</a></li> |
| <li><a href="#unit-testing" id="markdown-toc-unit-testing">Unit Testing</a></li> |
| <li><a href="#where-to-go-from-here" id="markdown-toc-where-to-go-from-here">Where to Go from Here</a></li> |
| </ul> |
| |
| <h1 id="overview">Overview</h1> |
| |
| <p>At a high level, every Spark application consists of a <em>driver program</em> that runs the user’s <code class="language-plaintext highlighter-rouge">main</code> function and executes various <em>parallel operations</em> on a cluster. The main abstraction Spark provides is a <em>resilient distributed dataset</em> (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to <em>persist</em> an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.</p> |
| |
| <p>A second abstraction in Spark is <em>shared variables</em> that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: <em>broadcast variables</em>, which can be used to cache a value in memory on all nodes, and <em>accumulators</em>, which are variables that are only “added” to, such as counters and sums.</p> |
| |
| <p>This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow |
| along with if you launch Spark’s interactive shell – either <code class="language-plaintext highlighter-rouge">bin/spark-shell</code> for the Scala shell or |
| <code class="language-plaintext highlighter-rouge">bin/pyspark</code> for the Python one.</p> |
| |
| <h1 id="linking-with-spark">Linking with Spark</h1> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>Spark 3.5.0 works with Python 3.8+. It can use the standard CPython interpreter, |
| so C libraries like NumPy can be used. It also works with PyPy 7.3.6+.</p> |
| |
| <p>Spark applications in Python can either be run with the <code class="language-plaintext highlighter-rouge">bin/spark-submit</code> script which includes Spark at runtime, or by including it in your setup.py as:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"> <span class="n">install_requires</span><span class="o">=</span><span class="p">[</span> |
| <span class="s">'pyspark=={site.SPARK_VERSION}'</span> |
| <span class="p">]</span></code></pre></figure> |
| |
| <p>To run Spark applications in Python without pip installing PySpark, use the <code class="language-plaintext highlighter-rouge">bin/spark-submit</code> script located in the Spark directory. |
| This script will load Spark’s Java/Scala libraries and allow you to submit applications to a cluster. |
| You can also use <code class="language-plaintext highlighter-rouge">bin/pyspark</code> to launch an interactive Python shell.</p> |
| |
| <p>If you wish to access HDFS data, you need to use a build of PySpark linking |
| to your version of HDFS. |
| <a href="https://spark.apache.org/downloads.html">Prebuilt packages</a> are also available on the Spark homepage |
| for common HDFS versions.</p> |
| |
| <p>Finally, you need to import some Spark classes into your program. Add the following line:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span><span class="p">,</span> <span class="n">SparkConf</span></code></pre></figure> |
| |
| <p>PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH, |
| you can specify which version of Python you want to use by <code class="language-plaintext highlighter-rouge">PYSPARK_PYTHON</code>, for example:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ PYSPARK_PYTHON</span><span class="o">=</span>python3.8 bin/pyspark |
| <span class="nv">$ PYSPARK_PYTHON</span><span class="o">=</span>/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py</code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>Spark 3.5.0 is built and distributed to work with Scala 2.12 |
| by default. (Spark can be built to work with other versions of Scala, too.) To write |
| applications in Scala, you will need to use a compatible Scala version (e.g. 2.12.X).</p> |
| |
| <p>To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>groupId = org.apache.spark |
| artifactId = spark-core_2.12 |
| version = 3.5.0 |
| </code></pre></div> </div> |
| |
| <p>In addition, if you wish to access an HDFS cluster, you need to add a dependency on |
| <code class="language-plaintext highlighter-rouge">hadoop-client</code> for your version of HDFS.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>groupId = org.apache.hadoop |
| artifactId = hadoop-client |
| version = <your-hdfs-version> |
| </code></pre></div> </div> |
| |
| <p>Finally, you need to import some Spark classes into your program. Add the following lines:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.SparkContext</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.SparkConf</span></code></pre></figure> |
| |
| <p>(Before Spark 1.3.0, you need to explicitly <code class="language-plaintext highlighter-rouge">import org.apache.spark.SparkContext._</code> to enable essential implicit conversions.)</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Spark 3.5.0 supports |
| <a href="http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html">lambda expressions</a> |
| for concisely writing functions, otherwise you can use the classes in the |
| <a href="api/java/index.html?org/apache/spark/api/java/function/package-summary.html">org.apache.spark.api.java.function</a> package.</p> |
| |
| <p>Note that support for Java 7 was removed in Spark 2.2.0.</p> |
| |
| <p>To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>groupId = org.apache.spark |
| artifactId = spark-core_2.12 |
| version = 3.5.0 |
| </code></pre></div> </div> |
| |
| <p>In addition, if you wish to access an HDFS cluster, you need to add a dependency on |
| <code class="language-plaintext highlighter-rouge">hadoop-client</code> for your version of HDFS.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>groupId = org.apache.hadoop |
| artifactId = hadoop-client |
| version = <your-hdfs-version> |
| </code></pre></div> </div> |
| |
| <p>Finally, you need to import some Spark classes into your program. Add the following lines:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.spark.api.java.JavaSparkContext</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.JavaRDD</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.SparkConf</span><span class="o">;</span></code></pre></figure> |
| |
| </div> |
| |
| </div> |
| |
| <h1 id="initializing-spark">Initializing Spark</h1> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>The first thing a Spark program must do is to create a <a href="api/python/reference/api/pyspark.SparkContext.html#pyspark.SparkContext">SparkContext</a> object, which tells Spark |
| how to access a cluster. To create a <code class="language-plaintext highlighter-rouge">SparkContext</code> you first need to build a <a href="api/python/reference/api/pyspark.SparkConf.html#pyspark.SparkConf">SparkConf</a> object |
| that contains information about your application.</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">().</span><span class="n">setAppName</span><span class="p">(</span><span class="n">appName</span><span class="p">).</span><span class="n">setMaster</span><span class="p">(</span><span class="n">master</span><span class="p">)</span> |
| <span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>The first thing a Spark program must do is to create a <a href="api/scala/org/apache/spark/SparkContext.html">SparkContext</a> object, which tells Spark |
| how to access a cluster. To create a <code class="language-plaintext highlighter-rouge">SparkContext</code> you first need to build a <a href="api/scala/org/apache/spark/SparkConf.html">SparkConf</a> object |
| that contains information about your application.</p> |
| |
| <p>Only one SparkContext should be active per JVM. You must <code class="language-plaintext highlighter-rouge">stop()</code> the active SparkContext before creating a new one.</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="py">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="py">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">)</span> |
| <span class="k">new</span> <span class="nc">SparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>The first thing a Spark program must do is to create a <a href="api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html">JavaSparkContext</a> object, which tells Spark |
| how to access a cluster. To create a <code class="language-plaintext highlighter-rouge">SparkContext</code> you first need to build a <a href="api/java/index.html?org/apache/spark/SparkConf.html">SparkConf</a> object |
| that contains information about your application.</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">SparkConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="na">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="na">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">);</span> |
| <span class="nc">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JavaSparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">);</span></code></pre></figure> |
| |
| </div> |
| |
| </div> |
| |
| <p>The <code class="language-plaintext highlighter-rouge">appName</code> parameter is a name for your application to show on the cluster UI. |
| <code class="language-plaintext highlighter-rouge">master</code> is a <a href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster URL</a>, |
| or a special “local” string to run in local mode. |
| In practice, when running on a cluster, you will not want to hardcode <code class="language-plaintext highlighter-rouge">master</code> in the program, |
| but rather <a href="submitting-applications.html">launch the application with <code class="language-plaintext highlighter-rouge">spark-submit</code></a> and |
| receive it there. However, for local testing and unit tests, you can pass “local” to run Spark |
| in-process.</p> |
| |
| <h2 id="using-the-shell">Using the Shell</h2> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the |
| variable called <code class="language-plaintext highlighter-rouge">sc</code>. Making your own SparkContext will not work. You can set which master the |
| context connects to using the <code class="language-plaintext highlighter-rouge">--master</code> argument, and you can add Python .zip, .egg or .py files |
| to the runtime path by passing a comma-separated list to <code class="language-plaintext highlighter-rouge">--py-files</code>. For third-party Python dependencies, |
| see <a href="api/python/user_guide/python_packaging.html">Python Package Management</a>. You can also add dependencies |
| (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates |
| to the <code class="language-plaintext highlighter-rouge">--packages</code> argument. Any additional repositories where dependencies might exist (e.g. Sonatype) |
| can be passed to the <code class="language-plaintext highlighter-rouge">--repositories</code> argument. For example, to run |
| <code class="language-plaintext highlighter-rouge">bin/pyspark</code> on exactly four cores, use:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/pyspark <span class="nt">--master</span> <span class="nb">local</span><span class="o">[</span>4]</code></pre></figure> |
| |
| <p>Or, to also add <code class="language-plaintext highlighter-rouge">code.py</code> to the search path (in order to later be able to <code class="language-plaintext highlighter-rouge">import code</code>), use:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/pyspark <span class="nt">--master</span> <span class="nb">local</span><span class="o">[</span>4] <span class="nt">--py-files</span> code.py</code></pre></figure> |
| |
| <p>For a complete list of options, run <code class="language-plaintext highlighter-rouge">pyspark --help</code>. Behind the scenes, |
| <code class="language-plaintext highlighter-rouge">pyspark</code> invokes the more general <a href="submitting-applications.html"><code class="language-plaintext highlighter-rouge">spark-submit</code> script</a>.</p> |
| |
| <p>It is also possible to launch the PySpark shell in <a href="http://ipython.org">IPython</a>, the |
| enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To |
| use IPython, set the <code class="language-plaintext highlighter-rouge">PYSPARK_DRIVER_PYTHON</code> variable to <code class="language-plaintext highlighter-rouge">ipython</code> when running <code class="language-plaintext highlighter-rouge">bin/pyspark</code>:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ PYSPARK_DRIVER_PYTHON</span><span class="o">=</span>ipython ./bin/pyspark</code></pre></figure> |
| |
| <p>To use the Jupyter notebook (previously known as the IPython notebook),</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ PYSPARK_DRIVER_PYTHON</span><span class="o">=</span>jupyter <span class="nv">PYSPARK_DRIVER_PYTHON_OPTS</span><span class="o">=</span>notebook ./bin/pyspark</code></pre></figure> |
| |
| <p>You can customize the <code class="language-plaintext highlighter-rouge">ipython</code> or <code class="language-plaintext highlighter-rouge">jupyter</code> commands by setting <code class="language-plaintext highlighter-rouge">PYSPARK_DRIVER_PYTHON_OPTS</code>.</p> |
| |
| <p>After the Jupyter Notebook server is launched, you can create a new notebook from |
| the “Files” tab. Inside the notebook, you can input the command <code class="language-plaintext highlighter-rouge">%pylab inline</code> as part of |
| your notebook before you start to try Spark from the Jupyter notebook.</p> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the |
| variable called <code class="language-plaintext highlighter-rouge">sc</code>. Making your own SparkContext will not work. You can set which master the |
| context connects to using the <code class="language-plaintext highlighter-rouge">--master</code> argument, and you can add JARs to the classpath |
| by passing a comma-separated list to the <code class="language-plaintext highlighter-rouge">--jars</code> argument. You can also add dependencies |
| (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates |
| to the <code class="language-plaintext highlighter-rouge">--packages</code> argument. Any additional repositories where dependencies might exist (e.g. Sonatype) |
| can be passed to the <code class="language-plaintext highlighter-rouge">--repositories</code> argument. For example, to run <code class="language-plaintext highlighter-rouge">bin/spark-shell</code> on exactly |
| four cores, use:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-shell <span class="nt">--master</span> <span class="nb">local</span><span class="o">[</span>4]</code></pre></figure> |
| |
| <p>Or, to also add <code class="language-plaintext highlighter-rouge">code.jar</code> to its classpath, use:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-shell <span class="nt">--master</span> <span class="nb">local</span><span class="o">[</span>4] <span class="nt">--jars</span> code.jar</code></pre></figure> |
| |
| <p>To include a dependency using Maven coordinates:</p> |
| |
| <figure class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-shell <span class="nt">--master</span> <span class="nb">local</span><span class="o">[</span>4] <span class="nt">--packages</span> <span class="s2">"org.example:example:0.1"</span></code></pre></figure> |
| |
| <p>For a complete list of options, run <code class="language-plaintext highlighter-rouge">spark-shell --help</code>. Behind the scenes, |
| <code class="language-plaintext highlighter-rouge">spark-shell</code> invokes the more general <a href="submitting-applications.html"><code class="language-plaintext highlighter-rouge">spark-submit</code> script</a>.</p> |
| |
| </div> |
| |
| </div> |
| |
| <h1 id="resilient-distributed-datasets-rdds">Resilient Distributed Datasets (RDDs)</h1> |
| |
| <p>Spark revolves around the concept of a <em>resilient distributed dataset</em> (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: <em>parallelizing</em> |
| an existing collection in your driver program, or referencing a dataset in an external storage system, such as a |
| shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.</p> |
| |
| <h2 id="parallelized-collections">Parallelized Collections</h2> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>Parallelized collections are created by calling <code class="language-plaintext highlighter-rouge">SparkContext</code>’s <code class="language-plaintext highlighter-rouge">parallelize</code> method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span> <span class="o">=</span> <span class="p">[</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">,</span> <span class="mi">4</span><span class="p">,</span> <span class="mi">5</span><span class="p">]</span> |
| <span class="n">distData</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">data</span><span class="p">)</span></code></pre></figure> |
| |
| <p>Once created, the distributed dataset (<code class="language-plaintext highlighter-rouge">distData</code>) can be operated on in parallel. For example, we can call <code class="language-plaintext highlighter-rouge">distData.reduce(lambda a, b: a + b)</code> to add up the elements of the list. |
| We describe operations on distributed datasets later on.</p> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>Parallelized collections are created by calling <code class="language-plaintext highlighter-rouge">SparkContext</code>’s <code class="language-plaintext highlighter-rouge">parallelize</code> method on an existing collection in your driver program (a Scala <code class="language-plaintext highlighter-rouge">Seq</code>). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">data</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">)</span> |
| <span class="k">val</span> <span class="nv">distData</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">)</span></code></pre></figure> |
| |
| <p>Once created, the distributed dataset (<code class="language-plaintext highlighter-rouge">distData</code>) can be operated on in parallel. For example, we might call <code class="language-plaintext highlighter-rouge">distData.reduce((a, b) => a + b)</code> to add up the elements of the array. We describe operations on distributed datasets later on.</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Parallelized collections are created by calling <code class="language-plaintext highlighter-rouge">JavaSparkContext</code>’s <code class="language-plaintext highlighter-rouge">parallelize</code> method on an existing <code class="language-plaintext highlighter-rouge">Collection</code> in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">List</span><span class="o"><</span><span class="nc">Integer</span><span class="o">></span> <span class="n">data</span> <span class="o">=</span> <span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span> |
| <span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">Integer</span><span class="o">></span> <span class="n">distData</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">);</span></code></pre></figure> |
| |
| <p>Once created, the distributed dataset (<code class="language-plaintext highlighter-rouge">distData</code>) can be operated on in parallel. For example, we might call <code class="language-plaintext highlighter-rouge">distData.reduce((a, b) -> a + b)</code> to add up the elements of the list. |
| We describe operations on distributed datasets later on.</p> |
| |
| </div> |
| |
| </div> |
| |
| <p>One important parameter for parallel collections is the number of <em>partitions</em> to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to <code class="language-plaintext highlighter-rouge">parallelize</code> (e.g. <code class="language-plaintext highlighter-rouge">sc.parallelize(data, 10)</code>). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.</p> |
| |
| <h2 id="external-datasets">External Datasets</h2> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, <a href="http://wiki.apache.org/hadoop/AmazonS3">Amazon S3</a>, etc. Spark supports text files, <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, and any other Hadoop <a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">InputFormat</a>.</p> |
| |
| <p>Text file RDDs can be created using <code class="language-plaintext highlighter-rouge">SparkContext</code>’s <code class="language-plaintext highlighter-rouge">textFile</code> method. This method takes a URI for the file (either a local path on the machine, or a <code class="language-plaintext highlighter-rouge">hdfs://</code>, <code class="language-plaintext highlighter-rouge">s3a://</code>, etc URI) and reads it as a collection of lines. Here is an example invocation:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">>>></span> <span class="n">distFile</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span></code></pre></figure> |
| |
| <p>Once created, <code class="language-plaintext highlighter-rouge">distFile</code> can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the <code class="language-plaintext highlighter-rouge">map</code> and <code class="language-plaintext highlighter-rouge">reduce</code> operations as follows: <code class="language-plaintext highlighter-rouge">distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)</code>.</p> |
| |
| <p>Some notes on reading files with Spark:</p> |
| |
| <ul> |
| <li> |
| <p>If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.</p> |
| </li> |
| <li> |
| <p>All of Spark’s file-based input methods, including <code class="language-plaintext highlighter-rouge">textFile</code>, support running on directories, compressed files, and wildcards as well. For example, you can use <code class="language-plaintext highlighter-rouge">textFile("/my/directory")</code>, <code class="language-plaintext highlighter-rouge">textFile("/my/directory/*.txt")</code>, and <code class="language-plaintext highlighter-rouge">textFile("/my/directory/*.gz")</code>.</p> |
| </li> |
| <li> |
| <p>The <code class="language-plaintext highlighter-rouge">textFile</code> method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.</p> |
| </li> |
| </ul> |
| |
| <p>Apart from text files, Spark’s Python API also supports several other data formats:</p> |
| |
| <ul> |
| <li> |
| <p><code class="language-plaintext highlighter-rouge">SparkContext.wholeTextFiles</code> lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code class="language-plaintext highlighter-rouge">textFile</code>, which would return one record per line in each file.</p> |
| </li> |
| <li> |
| <p><code class="language-plaintext highlighter-rouge">RDD.saveAsPickleFile</code> and <code class="language-plaintext highlighter-rouge">SparkContext.pickleFile</code> support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.</p> |
| </li> |
| <li> |
| <p>SequenceFile and Hadoop Input/Output Formats</p> |
| </li> |
| </ul> |
| |
| <p><strong>Note</strong> this feature is currently marked <code class="language-plaintext highlighter-rouge">Experimental</code> and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.</p> |
| |
| <p><strong>Writable Support</strong></p> |
| |
| <p>PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the |
| resulting Java objects using <a href="https://github.com/irmen/pickle/">pickle</a>. When saving an RDD of key-value pairs to SequenceFile, |
| PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following |
| Writables are automatically converted:</p> |
| |
| <table class="table table-striped"> |
| <thead><tr><th>Writable Type</th><th>Python Type</th></tr></thead> |
| <tr><td>Text</td><td>str</td></tr> |
| <tr><td>IntWritable</td><td>int</td></tr> |
| <tr><td>FloatWritable</td><td>float</td></tr> |
| <tr><td>DoubleWritable</td><td>float</td></tr> |
| <tr><td>BooleanWritable</td><td>bool</td></tr> |
| <tr><td>BytesWritable</td><td>bytearray</td></tr> |
| <tr><td>NullWritable</td><td>None</td></tr> |
| <tr><td>MapWritable</td><td>dict</td></tr> |
| </table> |
| |
| <p>Arrays are not handled out-of-the-box. Users need to specify custom <code class="language-plaintext highlighter-rouge">ArrayWritable</code> subtypes when reading or writing. When writing, |
| users also need to specify custom converters that convert arrays to custom <code class="language-plaintext highlighter-rouge">ArrayWritable</code> subtypes. When reading, the default |
| converter will convert custom <code class="language-plaintext highlighter-rouge">ArrayWritable</code> subtypes to Java <code class="language-plaintext highlighter-rouge">Object[]</code>, which then get pickled to Python tuples. To get |
| Python <code class="language-plaintext highlighter-rouge">array.array</code> for arrays of primitive types, users need to specify custom converters.</p> |
| |
| <p><strong>Saving and Loading SequenceFiles</strong></p> |
| |
| <p>Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value |
| classes can be specified, but for standard Writables this is not required.</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">>>></span> <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">4</span><span class="p">)).</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="s">"a"</span> <span class="o">*</span> <span class="n">x</span><span class="p">))</span> |
| <span class="o">>>></span> <span class="n">rdd</span><span class="p">.</span><span class="n">saveAsSequenceFile</span><span class="p">(</span><span class="s">"path/to/file"</span><span class="p">)</span> |
| <span class="o">>>></span> <span class="nb">sorted</span><span class="p">(</span><span class="n">sc</span><span class="p">.</span><span class="n">sequenceFile</span><span class="p">(</span><span class="s">"path/to/file"</span><span class="p">).</span><span class="n">collect</span><span class="p">())</span> |
| <span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="sa">u</span><span class="s">'a'</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="sa">u</span><span class="s">'aa'</span><span class="p">),</span> <span class="p">(</span><span class="mi">3</span><span class="p">,</span> <span class="sa">u</span><span class="s">'aaa'</span><span class="p">)]</span></code></pre></figure> |
| |
| <p><strong>Saving and Loading Other Hadoop Input/Output Formats</strong></p> |
| |
| <p>PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs. |
| If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the |
| Elasticsearch ESInputFormat:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="err">$</span> <span class="p">.</span><span class="o">/</span><span class="nb">bin</span><span class="o">/</span><span class="n">pyspark</span> <span class="o">--</span><span class="n">jars</span> <span class="o">/</span><span class="n">path</span><span class="o">/</span><span class="n">to</span><span class="o">/</span><span class="n">elasticsearch</span><span class="o">-</span><span class="n">hadoop</span><span class="p">.</span><span class="n">jar</span> |
| <span class="o">>>></span> <span class="n">conf</span> <span class="o">=</span> <span class="p">{</span><span class="s">"es.resource"</span> <span class="p">:</span> <span class="s">"index/type"</span><span class="p">}</span> <span class="c1"># assume Elasticsearch is running on localhost defaults |
| </span><span class="o">>>></span> <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">newAPIHadoopRDD</span><span class="p">(</span><span class="s">"org.elasticsearch.hadoop.mr.EsInputFormat"</span><span class="p">,</span> |
| <span class="s">"org.apache.hadoop.io.NullWritable"</span><span class="p">,</span> |
| <span class="s">"org.elasticsearch.hadoop.mr.LinkedMapWritable"</span><span class="p">,</span> |
| <span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span> |
| <span class="o">>>></span> <span class="n">rdd</span><span class="p">.</span><span class="n">first</span><span class="p">()</span> <span class="c1"># the result is a MapWritable that is converted to a Python dict |
| </span><span class="p">(</span><span class="sa">u</span><span class="s">'Elasticsearch ID'</span><span class="p">,</span> |
| <span class="p">{</span><span class="sa">u</span><span class="s">'field1'</span><span class="p">:</span> <span class="bp">True</span><span class="p">,</span> |
| <span class="sa">u</span><span class="s">'field2'</span><span class="p">:</span> <span class="sa">u</span><span class="s">'Some Text'</span><span class="p">,</span> |
| <span class="sa">u</span><span class="s">'field3'</span><span class="p">:</span> <span class="mi">12345</span><span class="p">})</span></code></pre></figure> |
| |
| <p>Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and |
| the key and value classes can easily be converted according to the above table, |
| then this approach should work well for such cases.</p> |
| |
| <p>If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to |
| transform that data on the Scala/Java side to something which can be handled by pickle’s pickler. |
| A <a href="api/scala/org/apache/spark/api/python/Converter.html">Converter</a> trait is provided |
| for this. Simply extend this trait and implement your transformation code in the <code class="language-plaintext highlighter-rouge">convert</code> |
| method. Remember to ensure that this class, along with any dependencies required to access your <code class="language-plaintext highlighter-rouge">InputFormat</code>, are packaged into your Spark job jar and included on the PySpark |
| classpath.</p> |
| |
| <p>See the <a href="https://github.com/apache/spark/tree/master/examples/src/main/python">Python examples</a> and |
| the <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters">Converter examples</a> |
| for examples of using Cassandra / HBase <code class="language-plaintext highlighter-rouge">InputFormat</code> and <code class="language-plaintext highlighter-rouge">OutputFormat</code> with custom converters.</p> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, <a href="http://wiki.apache.org/hadoop/AmazonS3">Amazon S3</a>, etc. Spark supports text files, <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, and any other Hadoop <a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">InputFormat</a>.</p> |
| |
| <p>Text file RDDs can be created using <code class="language-plaintext highlighter-rouge">SparkContext</code>’s <code class="language-plaintext highlighter-rouge">textFile</code> method. This method takes a URI for the file (either a local path on the machine, or a <code class="language-plaintext highlighter-rouge">hdfs://</code>, <code class="language-plaintext highlighter-rouge">s3a://</code>, etc URI) and reads it as a collection of lines. Here is an example invocation:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">scala</span><span class="o">></span> <span class="k">val</span> <span class="nv">distFile</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">)</span> |
| <span class="n">distFile</span><span class="k">:</span> <span class="kt">org.apache.spark.rdd.RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="nv">data</span><span class="o">.</span><span class="py">txt</span> <span class="nc">MapPartitionsRDD</span><span class="o">[</span><span class="err">10</span><span class="o">]</span> <span class="n">at</span> <span class="n">textFile</span> <span class="n">at</span> <span class="o"><</span><span class="n">console</span><span class="k">>:</span><span class="mi">26</span></code></pre></figure> |
| |
| <p>Once created, <code class="language-plaintext highlighter-rouge">distFile</code> can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the <code class="language-plaintext highlighter-rouge">map</code> and <code class="language-plaintext highlighter-rouge">reduce</code> operations as follows: <code class="language-plaintext highlighter-rouge">distFile.map(s => s.length).reduce((a, b) => a + b)</code>.</p> |
| |
| <p>Some notes on reading files with Spark:</p> |
| |
| <ul> |
| <li> |
| <p>If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.</p> |
| </li> |
| <li> |
| <p>All of Spark’s file-based input methods, including <code class="language-plaintext highlighter-rouge">textFile</code>, support running on directories, compressed files, and wildcards as well. For example, you can use <code class="language-plaintext highlighter-rouge">textFile("/my/directory")</code>, <code class="language-plaintext highlighter-rouge">textFile("/my/directory/*.txt")</code>, and <code class="language-plaintext highlighter-rouge">textFile("/my/directory/*.gz")</code>. When multiple files are read, the order of the partitions depends on the order the files are returned from the filesystem. It may or may not, for example, follow the lexicographic ordering of the files by path. Within a partition, elements are ordered according to their order in the underlying file.</p> |
| </li> |
| <li> |
| <p>The <code class="language-plaintext highlighter-rouge">textFile</code> method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.</p> |
| </li> |
| </ul> |
| |
| <p>Apart from text files, Spark’s Scala API also supports several other data formats:</p> |
| |
| <ul> |
| <li> |
| <p><code class="language-plaintext highlighter-rouge">SparkContext.wholeTextFiles</code> lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code class="language-plaintext highlighter-rouge">textFile</code>, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, <code class="language-plaintext highlighter-rouge">wholeTextFiles</code> provides an optional second argument for controlling the minimal number of partitions.</p> |
| </li> |
| <li> |
| <p>For <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, use SparkContext’s <code class="language-plaintext highlighter-rouge">sequenceFile[K, V]</code> method where <code class="language-plaintext highlighter-rouge">K</code> and <code class="language-plaintext highlighter-rouge">V</code> are the types of key and values in the file. These should be subclasses of Hadoop’s <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Writable.html">Writable</a> interface, like <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/IntWritable.html">IntWritable</a> and <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Text.html">Text</a>. In addition, Spark allows you to specify native types for a few common Writables; for example, <code class="language-plaintext highlighter-rouge">sequenceFile[Int, String]</code> will automatically read IntWritables and Texts.</p> |
| </li> |
| <li> |
| <p>For other Hadoop InputFormats, you can use the <code class="language-plaintext highlighter-rouge">SparkContext.hadoopRDD</code> method, which takes an arbitrary <code class="language-plaintext highlighter-rouge">JobConf</code> and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use <code class="language-plaintext highlighter-rouge">SparkContext.newAPIHadoopRDD</code> for InputFormats based on the “new” MapReduce API (<code class="language-plaintext highlighter-rouge">org.apache.hadoop.mapreduce</code>).</p> |
| </li> |
| <li> |
| <p><code class="language-plaintext highlighter-rouge">RDD.saveAsObjectFile</code> and <code class="language-plaintext highlighter-rouge">SparkContext.objectFile</code> support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.</p> |
| </li> |
| </ul> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, <a href="http://wiki.apache.org/hadoop/AmazonS3">Amazon S3</a>, etc. Spark supports text files, <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, and any other Hadoop <a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">InputFormat</a>.</p> |
| |
| <p>Text file RDDs can be created using <code class="language-plaintext highlighter-rouge">SparkContext</code>’s <code class="language-plaintext highlighter-rouge">textFile</code> method. This method takes a URI for the file (either a local path on the machine, or a <code class="language-plaintext highlighter-rouge">hdfs://</code>, <code class="language-plaintext highlighter-rouge">s3a://</code>, etc URI) and reads it as a collection of lines. Here is an example invocation:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">distFile</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span></code></pre></figure> |
| |
| <p>Once created, <code class="language-plaintext highlighter-rouge">distFile</code> can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the <code class="language-plaintext highlighter-rouge">map</code> and <code class="language-plaintext highlighter-rouge">reduce</code> operations as follows: <code class="language-plaintext highlighter-rouge">distFile.map(s -> s.length()).reduce((a, b) -> a + b)</code>.</p> |
| |
| <p>Some notes on reading files with Spark:</p> |
| |
| <ul> |
| <li> |
| <p>If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.</p> |
| </li> |
| <li> |
| <p>All of Spark’s file-based input methods, including <code class="language-plaintext highlighter-rouge">textFile</code>, support running on directories, compressed files, and wildcards as well. For example, you can use <code class="language-plaintext highlighter-rouge">textFile("/my/directory")</code>, <code class="language-plaintext highlighter-rouge">textFile("/my/directory/*.txt")</code>, and <code class="language-plaintext highlighter-rouge">textFile("/my/directory/*.gz")</code>.</p> |
| </li> |
| <li> |
| <p>The <code class="language-plaintext highlighter-rouge">textFile</code> method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.</p> |
| </li> |
| </ul> |
| |
| <p>Apart from text files, Spark’s Java API also supports several other data formats:</p> |
| |
| <ul> |
| <li> |
| <p><code class="language-plaintext highlighter-rouge">JavaSparkContext.wholeTextFiles</code> lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code class="language-plaintext highlighter-rouge">textFile</code>, which would return one record per line in each file.</p> |
| </li> |
| <li> |
| <p>For <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, use SparkContext’s <code class="language-plaintext highlighter-rouge">sequenceFile[K, V]</code> method where <code class="language-plaintext highlighter-rouge">K</code> and <code class="language-plaintext highlighter-rouge">V</code> are the types of key and values in the file. These should be subclasses of Hadoop’s <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Writable.html">Writable</a> interface, like <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/IntWritable.html">IntWritable</a> and <a href="https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Text.html">Text</a>.</p> |
| </li> |
| <li> |
| <p>For other Hadoop InputFormats, you can use the <code class="language-plaintext highlighter-rouge">JavaSparkContext.hadoopRDD</code> method, which takes an arbitrary <code class="language-plaintext highlighter-rouge">JobConf</code> and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use <code class="language-plaintext highlighter-rouge">JavaSparkContext.newAPIHadoopRDD</code> for InputFormats based on the “new” MapReduce API (<code class="language-plaintext highlighter-rouge">org.apache.hadoop.mapreduce</code>).</p> |
| </li> |
| <li> |
| <p><code class="language-plaintext highlighter-rouge">JavaRDD.saveAsObjectFile</code> and <code class="language-plaintext highlighter-rouge">JavaSparkContext.objectFile</code> support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.</p> |
| </li> |
| </ul> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="rdd-operations">RDD Operations</h2> |
| |
| <p>RDDs support two types of operations: <em>transformations</em>, which create a new dataset from an existing one, and <em>actions</em>, which return a value to the driver program after running a computation on the dataset. For example, <code class="language-plaintext highlighter-rouge">map</code> is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, <code class="language-plaintext highlighter-rouge">reduce</code> is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel <code class="language-plaintext highlighter-rouge">reduceByKey</code> that returns a distributed dataset).</p> |
| |
| <p>All transformations in Spark are <i>lazy</i>, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through <code class="language-plaintext highlighter-rouge">map</code> will be used in a <code class="language-plaintext highlighter-rouge">reduce</code> and return only the result of the <code class="language-plaintext highlighter-rouge">reduce</code> to the driver, rather than the larger mapped dataset.</p> |
| |
| <p>By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also <em>persist</em> an RDD in memory using the <code class="language-plaintext highlighter-rouge">persist</code> (or <code class="language-plaintext highlighter-rouge">cache</code>) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.</p> |
| |
| <h3 id="basics">Basics</h3> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>To illustrate RDD basics, consider the simple program below:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span> |
| <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="p">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="nb">len</span><span class="p">(</span><span class="n">s</span><span class="p">))</span> |
| <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="p">.</span><span class="nb">reduce</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="p">)</span></code></pre></figure> |
| |
| <p>The first line defines a base RDD from an external file. This dataset is not loaded in memory or |
| otherwise acted on: <code class="language-plaintext highlighter-rouge">lines</code> is merely a pointer to the file. |
| The second line defines <code class="language-plaintext highlighter-rouge">lineLengths</code> as the result of a <code class="language-plaintext highlighter-rouge">map</code> transformation. Again, <code class="language-plaintext highlighter-rouge">lineLengths</code> |
| is <em>not</em> immediately computed, due to laziness. |
| Finally, we run <code class="language-plaintext highlighter-rouge">reduce</code>, which is an action. At this point Spark breaks the computation into tasks |
| to run on separate machines, and each machine runs both its part of the map and a local reduction, |
| returning only its answer to the driver program.</p> |
| |
| <p>If we also wanted to use <code class="language-plaintext highlighter-rouge">lineLengths</code> again later, we could add:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">lineLengths</span><span class="p">.</span><span class="n">persist</span><span class="p">()</span></code></pre></figure> |
| |
| <p>before the <code class="language-plaintext highlighter-rouge">reduce</code>, which would cause <code class="language-plaintext highlighter-rouge">lineLengths</code> to be saved in memory after the first time it is computed.</p> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>To illustrate RDD basics, consider the simple program below:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">lines</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="nv">lineLengths</span> <span class="k">=</span> <span class="nv">lines</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">s</span> <span class="k">=></span> <span class="nv">s</span><span class="o">.</span><span class="py">length</span><span class="o">)</span> |
| <span class="k">val</span> <span class="nv">totalLength</span> <span class="k">=</span> <span class="nv">lineLengths</span><span class="o">.</span><span class="py">reduce</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">)</span></code></pre></figure> |
| |
| <p>The first line defines a base RDD from an external file. This dataset is not loaded in memory or |
| otherwise acted on: <code class="language-plaintext highlighter-rouge">lines</code> is merely a pointer to the file. |
| The second line defines <code class="language-plaintext highlighter-rouge">lineLengths</code> as the result of a <code class="language-plaintext highlighter-rouge">map</code> transformation. Again, <code class="language-plaintext highlighter-rouge">lineLengths</code> |
| is <em>not</em> immediately computed, due to laziness. |
| Finally, we run <code class="language-plaintext highlighter-rouge">reduce</code>, which is an action. At this point Spark breaks the computation into tasks |
| to run on separate machines, and each machine runs both its part of the map and a local reduction, |
| returning only its answer to the driver program.</p> |
| |
| <p>If we also wanted to use <code class="language-plaintext highlighter-rouge">lineLengths</code> again later, we could add:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">lineLengths</span><span class="o">.</span><span class="py">persist</span><span class="o">()</span></code></pre></figure> |
| |
| <p>before the <code class="language-plaintext highlighter-rouge">reduce</code>, which would cause <code class="language-plaintext highlighter-rouge">lineLengths</code> to be saved in memory after the first time it is computed.</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>To illustrate RDD basics, consider the simple program below:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span> |
| <span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">Integer</span><span class="o">></span> <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">s</span> <span class="o">-></span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">());</span> |
| <span class="kt">int</span> <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="na">reduce</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="o">-></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">);</span></code></pre></figure> |
| |
| <p>The first line defines a base RDD from an external file. This dataset is not loaded in memory or |
| otherwise acted on: <code class="language-plaintext highlighter-rouge">lines</code> is merely a pointer to the file. |
| The second line defines <code class="language-plaintext highlighter-rouge">lineLengths</code> as the result of a <code class="language-plaintext highlighter-rouge">map</code> transformation. Again, <code class="language-plaintext highlighter-rouge">lineLengths</code> |
| is <em>not</em> immediately computed, due to laziness. |
| Finally, we run <code class="language-plaintext highlighter-rouge">reduce</code>, which is an action. At this point Spark breaks the computation into tasks |
| to run on separate machines, and each machine runs both its part of the map and a local reduction, |
| returning only its answer to the driver program.</p> |
| |
| <p>If we also wanted to use <code class="language-plaintext highlighter-rouge">lineLengths</code> again later, we could add:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">lineLengths</span><span class="o">.</span><span class="na">persist</span><span class="o">(</span><span class="nc">StorageLevel</span><span class="o">.</span><span class="na">MEMORY_ONLY</span><span class="o">());</span></code></pre></figure> |
| |
| <p>before the <code class="language-plaintext highlighter-rouge">reduce</code>, which would cause <code class="language-plaintext highlighter-rouge">lineLengths</code> to be saved in memory after the first time it is computed.</p> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="passing-functions-to-spark">Passing Functions to Spark</h3> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>Spark’s API relies heavily on passing functions in the driver program to run on the cluster. |
| There are three recommended ways to do this:</p> |
| |
| <ul> |
| <li><a href="https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions">Lambda expressions</a>, |
| for simple functions that can be written as an expression. (Lambdas do not support multi-statement |
| functions or statements that do not return a value.)</li> |
| <li>Local <code class="language-plaintext highlighter-rouge">def</code>s inside the function calling into Spark, for longer code.</li> |
| <li>Top-level functions in a module.</li> |
| </ul> |
| |
| <p>For example, to pass a longer function than can be supported using a <code class="language-plaintext highlighter-rouge">lambda</code>, consider |
| the code below:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="s">"""MyScript.py"""</span> |
| <span class="k">if</span> <span class="n">__name__</span> <span class="o">==</span> <span class="s">"__main__"</span><span class="p">:</span> |
| <span class="k">def</span> <span class="nf">myFunc</span><span class="p">(</span><span class="n">s</span><span class="p">):</span> |
| <span class="n">words</span> <span class="o">=</span> <span class="n">s</span><span class="p">.</span><span class="n">split</span><span class="p">(</span><span class="s">" "</span><span class="p">)</span> |
| <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">words</span><span class="p">)</span> |
| |
| <span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(...)</span> |
| <span class="n">sc</span><span class="p">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"file.txt"</span><span class="p">).</span><span class="nb">map</span><span class="p">(</span><span class="n">myFunc</span><span class="p">)</span></code></pre></figure> |
| |
| <p>Note that while it is also possible to pass a reference to a method in a class instance (as opposed to |
| a singleton object), this requires sending the object that contains that class along with the method. |
| For example, consider:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">MyClass</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">s</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">s</span> |
| <span class="k">def</span> <span class="nf">doStuff</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdd</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">rdd</span><span class="p">.</span><span class="nb">map</span><span class="p">(</span><span class="bp">self</span><span class="p">.</span><span class="n">func</span><span class="p">)</span></code></pre></figure> |
| |
| <p>Here, if we create a <code class="language-plaintext highlighter-rouge">new MyClass</code> and call <code class="language-plaintext highlighter-rouge">doStuff</code> on it, the <code class="language-plaintext highlighter-rouge">map</code> inside there references the |
| <code class="language-plaintext highlighter-rouge">func</code> method <em>of that <code class="language-plaintext highlighter-rouge">MyClass</code> instance</em>, so the whole object needs to be sent to the cluster.</p> |
| |
| <p>In a similar way, accessing fields of the outer object will reference the whole object:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">MyClass</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="p">.</span><span class="n">field</span> <span class="o">=</span> <span class="s">"Hello"</span> |
| <span class="k">def</span> <span class="nf">doStuff</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdd</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">rdd</span><span class="p">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="bp">self</span><span class="p">.</span><span class="n">field</span> <span class="o">+</span> <span class="n">s</span><span class="p">)</span></code></pre></figure> |
| |
| <p>To avoid this issue, the simplest way is to copy <code class="language-plaintext highlighter-rouge">field</code> into a local variable instead |
| of accessing it externally:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">def</span> <span class="nf">doStuff</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdd</span><span class="p">):</span> |
| <span class="n">field</span> <span class="o">=</span> <span class="bp">self</span><span class="p">.</span><span class="n">field</span> |
| <span class="k">return</span> <span class="n">rdd</span><span class="p">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="n">field</span> <span class="o">+</span> <span class="n">s</span><span class="p">)</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>Spark’s API relies heavily on passing functions in the driver program to run on the cluster. |
| There are two recommended ways to do this:</p> |
| |
| <ul> |
| <li><a href="http://docs.scala-lang.org/tour/basics.html#functions">Anonymous function syntax</a>, |
| which can be used for short pieces of code.</li> |
| <li>Static methods in a global singleton object. For example, you can define <code class="language-plaintext highlighter-rouge">object MyFunctions</code> and then |
| pass <code class="language-plaintext highlighter-rouge">MyFunctions.func1</code>, as follows:</li> |
| </ul> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">MyFunctions</span> <span class="o">{</span> |
| <span class="k">def</span> <span class="nf">func1</span><span class="o">(</span><span class="n">s</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="nv">myRdd</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="nv">MyFunctions</span><span class="o">.</span><span class="py">func1</span><span class="o">)</span></code></pre></figure> |
| |
| <p>Note that while it is also possible to pass a reference to a method in a class instance (as opposed to |
| a singleton object), this requires sending the object that contains that class along with the method. |
| For example, consider:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MyClass</span> <span class="o">{</span> |
| <span class="k">def</span> <span class="nf">func1</span><span class="o">(</span><span class="n">s</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> |
| <span class="k">def</span> <span class="nf">doStuff</span><span class="o">(</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> <span class="nv">rdd</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">func1</span><span class="o">)</span> <span class="o">}</span> |
| <span class="o">}</span></code></pre></figure> |
| |
| <p>Here, if we create a new <code class="language-plaintext highlighter-rouge">MyClass</code> instance and call <code class="language-plaintext highlighter-rouge">doStuff</code> on it, the <code class="language-plaintext highlighter-rouge">map</code> inside there references the |
| <code class="language-plaintext highlighter-rouge">func1</code> method <em>of that <code class="language-plaintext highlighter-rouge">MyClass</code> instance</em>, so the whole object needs to be sent to the cluster. It is |
| similar to writing <code class="language-plaintext highlighter-rouge">rdd.map(x => this.func1(x))</code>.</p> |
| |
| <p>In a similar way, accessing fields of the outer object will reference the whole object:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MyClass</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="nv">field</span> <span class="k">=</span> <span class="s">"Hello"</span> |
| <span class="k">def</span> <span class="nf">doStuff</span><span class="o">(</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> <span class="nv">rdd</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="n">field</span> <span class="o">+</span> <span class="n">x</span><span class="o">)</span> <span class="o">}</span> |
| <span class="o">}</span></code></pre></figure> |
| |
| <p>is equivalent to writing <code class="language-plaintext highlighter-rouge">rdd.map(x => this.field + x)</code>, which references all of <code class="language-plaintext highlighter-rouge">this</code>. To avoid this |
| issue, the simplest way is to copy <code class="language-plaintext highlighter-rouge">field</code> into a local variable instead of accessing it externally:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="nf">doStuff</span><span class="o">(</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="nv">field_</span> <span class="k">=</span> <span class="k">this</span><span class="o">.</span><span class="py">field</span> |
| <span class="nv">rdd</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="n">field_</span> <span class="o">+</span> <span class="n">x</span><span class="o">)</span> |
| <span class="o">}</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Spark’s API relies heavily on passing functions in the driver program to run on the cluster. |
| In Java, functions are represented by classes implementing the interfaces in the |
| <a href="api/java/index.html?org/apache/spark/api/java/function/package-summary.html">org.apache.spark.api.java.function</a> package. |
| There are two ways to create such functions:</p> |
| |
| <ul> |
| <li>Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, |
| and pass an instance of it to Spark.</li> |
| <li>Use <a href="http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html">lambda expressions</a> |
| to concisely define an implementation.</li> |
| </ul> |
| |
| <p>While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs |
| in long-form. For example, we could have written our code above as follows:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span> |
| <span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">Integer</span><span class="o">></span> <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nc">Function</span><span class="o"><</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="nc">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">();</span> <span class="o">}</span> |
| <span class="o">});</span> |
| <span class="kt">int</span> <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nc">Function2</span><span class="o"><</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="nc">Integer</span> <span class="n">a</span><span class="o">,</span> <span class="nc">Integer</span> <span class="n">b</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">;</span> <span class="o">}</span> |
| <span class="o">});</span></code></pre></figure> |
| |
| <p>Or, if writing the functions inline is unwieldy:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">GetLength</span> <span class="kd">implements</span> <span class="nc">Function</span><span class="o"><</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">></span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="nc">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">();</span> <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="kd">class</span> <span class="nc">Sum</span> <span class="kd">implements</span> <span class="nc">Function2</span><span class="o"><</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">></span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="nc">Integer</span> <span class="n">a</span><span class="o">,</span> <span class="nc">Integer</span> <span class="n">b</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">;</span> <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span> |
| <span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">Integer</span><span class="o">></span> <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nc">GetLength</span><span class="o">());</span> |
| <span class="kt">int</span> <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nc">Sum</span><span class="o">());</span></code></pre></figure> |
| |
| <p>Note that anonymous inner classes in Java can also access variables in the enclosing scope as long |
| as they are marked <code class="language-plaintext highlighter-rouge">final</code>. Spark will ship copies of these variables to each worker node as it does |
| for other languages.</p> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="understanding-closures-">Understanding closures <a name="ClosuresLink"></a></h3> |
| <p>One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses <code class="language-plaintext highlighter-rouge">foreach()</code> to increment a counter, but similar issues can occur for other operations as well.</p> |
| |
| <h4 id="example">Example</h4> |
| |
| <p>Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in <code class="language-plaintext highlighter-rouge">local</code> mode (<code class="language-plaintext highlighter-rouge">--master = local[n]</code>) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">counter</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">data</span><span class="p">)</span> |
| |
| <span class="c1"># Wrong: Don't do this!! |
| </span><span class="k">def</span> <span class="nf">increment_counter</span><span class="p">(</span><span class="n">x</span><span class="p">):</span> |
| <span class="k">global</span> <span class="n">counter</span> |
| <span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span> |
| <span class="n">rdd</span><span class="p">.</span><span class="n">foreach</span><span class="p">(</span><span class="n">increment_counter</span><span class="p">)</span> |
| |
| <span class="k">print</span><span class="p">(</span><span class="s">"Counter value: "</span><span class="p">,</span> <span class="n">counter</span><span class="p">)</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">var</span> <span class="n">counter</span> <span class="k">=</span> <span class="mi">0</span> |
| <span class="k">var</span> <span class="n">rdd</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">)</span> |
| |
| <span class="c1">// Wrong: Don't do this!!</span> |
| <span class="nv">rdd</span><span class="o">.</span><span class="py">foreach</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span><span class="o">)</span> |
| |
| <span class="nf">println</span><span class="o">(</span><span class="s">"Counter value: "</span> <span class="o">+</span> <span class="n">counter</span><span class="o">)</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kt">int</span> <span class="n">counter</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> |
| <span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">Integer</span><span class="o">></span> <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">);</span> |
| |
| <span class="c1">// Wrong: Don't do this!!</span> |
| <span class="n">rdd</span><span class="o">.</span><span class="na">foreach</span><span class="o">(</span><span class="n">x</span> <span class="o">-></span> <span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span><span class="o">);</span> |
| |
| <span class="n">println</span><span class="o">(</span><span class="s">"Counter value: "</span> <span class="o">+</span> <span class="n">counter</span><span class="o">);</span></code></pre></figure> |
| |
| </div> |
| |
| </div> |
| |
| <h4 id="local-vs-cluster-modes">Local vs. cluster modes</h4> |
| |
| <p>The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s <strong>closure</strong>. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case <code class="language-plaintext highlighter-rouge">foreach()</code>). This closure is serialized and sent to each executor.</p> |
| |
| <p>The variables within the closure sent to each executor are now copies and thus, when <strong>counter</strong> is referenced within the <code class="language-plaintext highlighter-rouge">foreach</code> function, it’s no longer the <strong>counter</strong> on the driver node. There is still a <strong>counter</strong> in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of <strong>counter</strong> will still be zero since all operations on <strong>counter</strong> were referencing the value within the serialized closure.</p> |
| |
| <p>In local mode, in some circumstances, the <code class="language-plaintext highlighter-rouge">foreach</code> function will actually execute within the same JVM as the driver and will reference the same original <strong>counter</strong>, and may actually update it.</p> |
| |
| <p>To ensure well-defined behavior in these sorts of scenarios one should use an <a href="#accumulators"><code class="language-plaintext highlighter-rouge">Accumulator</code></a>. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.</p> |
| |
| <p>In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.</p> |
| |
| <h4 id="printing-elements-of-an-rdd">Printing elements of an RDD</h4> |
| <p>Another common idiom is attempting to print out the elements of an RDD using <code class="language-plaintext highlighter-rouge">rdd.foreach(println)</code> or <code class="language-plaintext highlighter-rouge">rdd.map(println)</code>. On a single machine, this will generate the expected output and print all the RDD’s elements. However, in <code class="language-plaintext highlighter-rouge">cluster</code> mode, the output to <code class="language-plaintext highlighter-rouge">stdout</code> being called by the executors is now writing to the executor’s <code class="language-plaintext highlighter-rouge">stdout</code> instead, not the one on the driver, so <code class="language-plaintext highlighter-rouge">stdout</code> on the driver won’t show these! To print all elements on the driver, one can use the <code class="language-plaintext highlighter-rouge">collect()</code> method to first bring the RDD to the driver node thus: <code class="language-plaintext highlighter-rouge">rdd.collect().foreach(println)</code>. This can cause the driver to run out of memory, though, because <code class="language-plaintext highlighter-rouge">collect()</code> fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the <code class="language-plaintext highlighter-rouge">take()</code>: <code class="language-plaintext highlighter-rouge">rdd.take(100).foreach(println)</code>.</p> |
| |
| <h3 id="working-with-key-value-pairs">Working with Key-Value Pairs</h3> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>While most Spark operations work on RDDs containing any type of objects, a few special operations are |
| only available on RDDs of key-value pairs. |
| The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements |
| by a key.</p> |
| |
| <p>In Python, these operations work on RDDs containing built-in Python tuples such as <code class="language-plaintext highlighter-rouge">(1, 2)</code>. |
| Simply create such tuples and then call your desired operation.</p> |
| |
| <p>For example, the following code uses the <code class="language-plaintext highlighter-rouge">reduceByKey</code> operation on key-value pairs to count how |
| many times each line of text occurs in a file:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span> |
| <span class="n">pairs</span> <span class="o">=</span> <span class="n">lines</span><span class="p">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="p">(</span><span class="n">s</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span> |
| <span class="n">counts</span> <span class="o">=</span> <span class="n">pairs</span><span class="p">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="p">)</span></code></pre></figure> |
| |
| <p>We could also use <code class="language-plaintext highlighter-rouge">counts.sortByKey()</code>, for example, to sort the pairs alphabetically, and finally |
| <code class="language-plaintext highlighter-rouge">counts.collect()</code> to bring them back to the driver program as a list of objects.</p> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>While most Spark operations work on RDDs containing any type of objects, a few special operations are |
| only available on RDDs of key-value pairs. |
| The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements |
| by a key.</p> |
| |
| <p>In Scala, these operations are automatically available on RDDs containing |
| <a href="http://www.scala-lang.org/api/2.12.18/index.html#scala.Tuple2">Tuple2</a> objects |
| (the built-in tuples in the language, created by simply writing <code class="language-plaintext highlighter-rouge">(a, b)</code>). The key-value pair operations are available in the |
| <a href="api/scala/org/apache/spark/rdd/PairRDDFunctions.html">PairRDDFunctions</a> class, |
| which automatically wraps around an RDD of tuples.</p> |
| |
| <p>For example, the following code uses the <code class="language-plaintext highlighter-rouge">reduceByKey</code> operation on key-value pairs to count how |
| many times each line of text occurs in a file:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">lines</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="nv">pairs</span> <span class="k">=</span> <span class="nv">lines</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">s</span> <span class="k">=></span> <span class="o">(</span><span class="n">s</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> |
| <span class="k">val</span> <span class="nv">counts</span> <span class="k">=</span> <span class="nv">pairs</span><span class="o">.</span><span class="py">reduceByKey</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">)</span></code></pre></figure> |
| |
| <p>We could also use <code class="language-plaintext highlighter-rouge">counts.sortByKey()</code>, for example, to sort the pairs alphabetically, and finally |
| <code class="language-plaintext highlighter-rouge">counts.collect()</code> to bring them back to the driver program as an array of objects.</p> |
| |
| <p><strong>Note:</strong> when using custom objects as the key in key-value pair operations, you must be sure that a |
| custom <code class="language-plaintext highlighter-rouge">equals()</code> method is accompanied with a matching <code class="language-plaintext highlighter-rouge">hashCode()</code> method. For full details, see |
| the contract outlined in the <a href="https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#hashCode--">Object.hashCode() |
| documentation</a>.</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>While most Spark operations work on RDDs containing any type of objects, a few special operations are |
| only available on RDDs of key-value pairs. |
| The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements |
| by a key.</p> |
| |
| <p>In Java, key-value pairs are represented using the |
| <a href="http://www.scala-lang.org/api/2.12.18/index.html#scala.Tuple2">scala.Tuple2</a> class |
| from the Scala standard library. You can simply call <code class="language-plaintext highlighter-rouge">new Tuple2(a, b)</code> to create a tuple, and access |
| its fields later with <code class="language-plaintext highlighter-rouge">tuple._1()</code> and <code class="language-plaintext highlighter-rouge">tuple._2()</code>.</p> |
| |
| <p>RDDs of key-value pairs are represented by the |
| <a href="api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html">JavaPairRDD</a> class. You can construct |
| JavaPairRDDs from JavaRDDs using special versions of the <code class="language-plaintext highlighter-rouge">map</code> operations, like |
| <code class="language-plaintext highlighter-rouge">mapToPair</code> and <code class="language-plaintext highlighter-rouge">flatMapToPair</code>. The JavaPairRDD will have both standard RDD functions and special |
| key-value ones.</p> |
| |
| <p>For example, the following code uses the <code class="language-plaintext highlighter-rouge">reduceByKey</code> operation on key-value pairs to count how |
| many times each line of text occurs in a file:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">lines</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span> |
| <span class="nc">JavaPairRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">></span> <span class="n">pairs</span> <span class="k">=</span> <span class="nv">lines</span><span class="o">.</span><span class="py">mapToPair</span><span class="o">(</span><span class="n">s</span> <span class="o">-></span> <span class="k">new</span> <span class="nc">Tuple2</span><span class="o">(</span><span class="n">s</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span> |
| <span class="nc">JavaPairRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">></span> <span class="n">counts</span> <span class="k">=</span> <span class="nv">pairs</span><span class="o">.</span><span class="py">reduceByKey</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="o">-></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">);</span></code></pre></figure> |
| |
| <p>We could also use <code class="language-plaintext highlighter-rouge">counts.sortByKey()</code>, for example, to sort the pairs alphabetically, and finally |
| <code class="language-plaintext highlighter-rouge">counts.collect()</code> to bring them back to the driver program as an array of objects.</p> |
| |
| <p><strong>Note:</strong> when using custom objects as the key in key-value pair operations, you must be sure that a |
| custom <code class="language-plaintext highlighter-rouge">equals()</code> method is accompanied with a matching <code class="language-plaintext highlighter-rouge">hashCode()</code> method. For full details, see |
| the contract outlined in the <a href="https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#hashCode--">Object.hashCode() |
| documentation</a>.</p> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="transformations">Transformations</h3> |
| |
| <p>The following table lists some of the common transformations supported by Spark. Refer to the |
| RDD API doc |
| (<a href="api/scala/org/apache/spark/rdd/RDD.html">Scala</a>, |
| <a href="api/java/index.html?org/apache/spark/api/java/JavaRDD.html">Java</a>, |
| <a href="api/python/reference/api/pyspark.RDD.html#pyspark.RDD">Python</a>, |
| <a href="api/R/reference/index.html">R</a>) |
| and pair RDD functions doc |
| (<a href="api/scala/org/apache/spark/rdd/PairRDDFunctions.html">Scala</a>, |
| <a href="api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html">Java</a>) |
| for details.</p> |
| |
| <table class="table table-striped"> |
| <thead><tr><th style="width:25%">Transformation</th><th>Meaning</th></tr></thead> |
| <tr> |
| <td> <b>map</b>(<i>func</i>) </td> |
| <td> Return a new distributed dataset formed by passing each element of the source through a function <i>func</i>. </td> |
| </tr> |
| <tr> |
| <td> <b>filter</b>(<i>func</i>) </td> |
| <td> Return a new dataset formed by selecting those elements of the source on which <i>func</i> returns true. </td> |
| </tr> |
| <tr> |
| <td> <b>flatMap</b>(<i>func</i>) </td> |
| <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td> |
| </tr> |
| <tr> |
| <td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td> |
| <td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type |
| Iterator<T> => Iterator<U> when running on an RDD of type T. </td> |
| </tr> |
| <tr> |
| <td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td> |
| <td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of |
| the partition, so <i>func</i> must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>sample</b>(<i>withReplacement</i>, <i>fraction</i>, <i>seed</i>) </td> |
| <td> Sample a fraction <i>fraction</i> of the data, with or without replacement, using a given random number generator seed. </td> |
| </tr> |
| <tr> |
| <td> <b>union</b>(<i>otherDataset</i>) </td> |
| <td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td> |
| </tr> |
| <tr> |
| <td> <b>intersection</b>(<i>otherDataset</i>) </td> |
| <td> Return a new RDD that contains the intersection of elements in the source dataset and the argument. </td> |
| </tr> |
| <tr> |
| <td> <b>distinct</b>([<i>numPartitions</i>])) </td> |
| <td> Return a new dataset that contains the distinct elements of the source dataset.</td> |
| </tr> |
| <tr> |
| <td> <b>groupByKey</b>([<i>numPartitions</i>]) <a name="GroupByLink"></a> </td> |
| <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. <br /> |
| <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or |
| average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better |
| performance. |
| <br /> |
| <b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. |
| You can pass an optional <code>numPartitions</code> argument to set a different number of tasks. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>reduceByKey</b>(<i>func</i>, [<i>numPartitions</i>]) <a name="ReduceByLink"></a> </td> |
| <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td> |
| </tr> |
| <tr> |
| <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numPartitions</i>]) <a name="AggregateByLink"></a> </td> |
| <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td> |
| </tr> |
| <tr> |
| <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numPartitions</i>]) <a name="SortByLink"></a> </td> |
| <td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td> |
| </tr> |
| <tr> |
| <td> <b>join</b>(<i>otherDataset</i>, [<i>numPartitions</i>]) <a name="JoinLink"></a> </td> |
| <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. |
| Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numPartitions</i>]) <a name="CogroupLink"></a> </td> |
| <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called <code>groupWith</code>. </td> |
| </tr> |
| <tr> |
| <td> <b>cartesian</b>(<i>otherDataset</i>) </td> |
| <td> When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). </td> |
| </tr> |
| <tr> |
| <td> <b>pipe</b>(<i>command</i>, <i>[envVars]</i>) </td> |
| <td> Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the |
| process's stdin and lines output to its stdout are returned as an RDD of strings. </td> |
| </tr> |
| <tr> |
| <td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td> |
| <td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently |
| after filtering down a large dataset. </td> |
| </tr> |
| <tr> |
| <td> <b>repartition</b>(<i>numPartitions</i>) </td> |
| <td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. |
| This always shuffles all data over the network. <a name="RepartitionLink"></a></td> |
| </tr> |
| <tr> |
| <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td> |
| <td> Repartition the RDD according to the given partitioner and, within each resulting partition, |
| sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within |
| each partition because it can push the sorting down into the shuffle machinery. </td> |
| </tr> |
| </table> |
| |
| <h3 id="actions">Actions</h3> |
| |
| <p>The following table lists some of the common actions supported by Spark. Refer to the |
| RDD API doc |
| (<a href="api/scala/org/apache/spark/rdd/RDD.html">Scala</a>, |
| <a href="api/java/index.html?org/apache/spark/api/java/JavaRDD.html">Java</a>, |
| <a href="api/python/reference/api/pyspark.RDD.html#pyspark.RDD">Python</a>, |
| <a href="api/R/reference/index.html">R</a>)</p> |
| |
| <p>and pair RDD functions doc |
| (<a href="api/scala/org/apache/spark/rdd/PairRDDFunctions.html">Scala</a>, |
| <a href="api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html">Java</a>) |
| for details.</p> |
| |
| <table class="table table-striped"> |
| <thead><tr><th>Action</th><th>Meaning</th></tr></thead> |
| <tr> |
| <td> <b>reduce</b>(<i>func</i>) </td> |
| <td> Aggregate the elements of the dataset using a function <i>func</i> (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. </td> |
| </tr> |
| <tr> |
| <td> <b>collect</b>() </td> |
| <td> Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. </td> |
| </tr> |
| <tr> |
| <td> <b>count</b>() </td> |
| <td> Return the number of elements in the dataset. </td> |
| </tr> |
| <tr> |
| <td> <b>first</b>() </td> |
| <td> Return the first element of the dataset (similar to take(1)). </td> |
| </tr> |
| <tr> |
| <td> <b>take</b>(<i>n</i>) </td> |
| <td> Return an array with the first <i>n</i> elements of the dataset. </td> |
| </tr> |
| <tr> |
| <td> <b>takeSample</b>(<i>withReplacement</i>, <i>num</i>, [<i>seed</i>]) </td> |
| <td> Return an array with a random sample of <i>num</i> elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.</td> |
| </tr> |
| <tr> |
| <td> <b>takeOrdered</b>(<i>n</i>, <i>[ordering]</i>) </td> |
| <td> Return the first <i>n</i> elements of the RDD using either their natural order or a custom comparator. </td> |
| </tr> |
| <tr> |
| <td> <b>saveAsTextFile</b>(<i>path</i>) </td> |
| <td> Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. </td> |
| </tr> |
| <tr> |
| <td> <b>saveAsSequenceFile</b>(<i>path</i>) <br /> (Java and Scala) </td> |
| <td> Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also |
| available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). </td> |
| </tr> |
| <tr> |
| <td> <b>saveAsObjectFile</b>(<i>path</i>) <br /> (Java and Scala) </td> |
| <td> Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using |
| <code>SparkContext.objectFile()</code>. </td> |
| </tr> |
| <tr> |
| <td> <b>countByKey</b>() <a name="CountByLink"></a> </td> |
| <td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td> |
| </tr> |
| <tr> |
| <td> <b>foreach</b>(<i>func</i>) </td> |
| <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems. |
| <br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#understanding-closures-a-nameclosureslinka">Understanding closures </a> for more details.</td> |
| </tr> |
| </table> |
| |
| <p>The Spark RDD API also exposes asynchronous versions of some actions, like <code class="language-plaintext highlighter-rouge">foreachAsync</code> for <code class="language-plaintext highlighter-rouge">foreach</code>, which immediately return a <code class="language-plaintext highlighter-rouge">FutureAction</code> to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.</p> |
| |
| <h3 id="shuffle-operations">Shuffle operations</h3> |
| |
| <p>Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s |
| mechanism for re-distributing data so that it’s grouped differently across partitions. This typically |
| involves copying data across executors and machines, making the shuffle a complex and |
| costly operation.</p> |
| |
| <h4 id="background">Background</h4> |
| |
| <p>To understand what happens during the shuffle, we can consider the example of the |
| <a href="#ReduceByLink"><code class="language-plaintext highlighter-rouge">reduceByKey</code></a> operation. The <code class="language-plaintext highlighter-rouge">reduceByKey</code> operation generates a new RDD where all |
| values for a single key are combined into a tuple - the key and the result of executing a reduce |
| function against all values associated with that key. The challenge is that not all values for a |
| single key necessarily reside on the same partition, or even the same machine, but they must be |
| co-located to compute the result.</p> |
| |
| <p>In Spark, data is generally not distributed across partitions to be in the necessary place for a |
| specific operation. During computations, a single task will operate on a single partition - thus, to |
| organize all the data for a single <code class="language-plaintext highlighter-rouge">reduceByKey</code> reduce task to execute, Spark needs to perform an |
| all-to-all operation. It must read from all partitions to find all the values for all keys, |
| and then bring together values across partitions to compute the final result for each key - |
| this is called the <strong>shuffle</strong>.</p> |
| |
| <p>Although the set of elements in each partition of newly shuffled data will be deterministic, and so |
| is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably |
| ordered data following shuffle then it’s possible to use:</p> |
| |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">mapPartitions</code> to sort each partition using, for example, <code class="language-plaintext highlighter-rouge">.sorted</code></li> |
| <li><code class="language-plaintext highlighter-rouge">repartitionAndSortWithinPartitions</code> to efficiently sort partitions while simultaneously repartitioning</li> |
| <li><code class="language-plaintext highlighter-rouge">sortBy</code> to make a globally ordered RDD</li> |
| </ul> |
| |
| <p>Operations which can cause a shuffle include <strong>repartition</strong> operations like |
| <a href="#RepartitionLink"><code class="language-plaintext highlighter-rouge">repartition</code></a> and <a href="#CoalesceLink"><code class="language-plaintext highlighter-rouge">coalesce</code></a>, <strong>‘ByKey</strong> operations |
| (except for counting) like <a href="#GroupByLink"><code class="language-plaintext highlighter-rouge">groupByKey</code></a> and <a href="#ReduceByLink"><code class="language-plaintext highlighter-rouge">reduceByKey</code></a>, and |
| <strong>join</strong> operations like <a href="#CogroupLink"><code class="language-plaintext highlighter-rouge">cogroup</code></a> and <a href="#JoinLink"><code class="language-plaintext highlighter-rouge">join</code></a>.</p> |
| |
| <h4 id="performance-impact">Performance Impact</h4> |
| <p>The <strong>Shuffle</strong> is an expensive operation since it involves disk I/O, data serialization, and |
| network I/O. To organize data for the shuffle, Spark generates sets of tasks - <em>map</em> tasks to |
| organize the data, and a set of <em>reduce</em> tasks to aggregate it. This nomenclature comes from |
| MapReduce and does not directly relate to Spark’s <code class="language-plaintext highlighter-rouge">map</code> and <code class="language-plaintext highlighter-rouge">reduce</code> operations.</p> |
| |
| <p>Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these |
| are sorted based on the target partition and written to a single file. On the reduce side, tasks |
| read the relevant sorted blocks.</p> |
| |
| <p>Certain shuffle operations can consume significant amounts of heap memory since they employ |
| in-memory data structures to organize records before or after transferring them. Specifically, |
| <code class="language-plaintext highlighter-rouge">reduceByKey</code> and <code class="language-plaintext highlighter-rouge">aggregateByKey</code> create these structures on the map side, and <code class="language-plaintext highlighter-rouge">'ByKey</code> operations |
| generate these on the reduce side. When data does not fit in memory Spark will spill these tables |
| to disk, incurring the additional overhead of disk I/O and increased garbage collection.</p> |
| |
| <p>Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files |
| are preserved until the corresponding RDDs are no longer used and are garbage collected. |
| This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. |
| Garbage collection may happen only after a long period of time, if the application retains references |
| to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may |
| consume a large amount of disk space. The temporary storage directory is specified by the |
| <code class="language-plaintext highlighter-rouge">spark.local.dir</code> configuration parameter when configuring the Spark context.</p> |
| |
| <p>Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the |
| ‘Shuffle Behavior’ section within the <a href="configuration.html">Spark Configuration Guide</a>.</p> |
| |
| <h2 id="rdd-persistence">RDD Persistence</h2> |
| |
| <p>One of the most important capabilities in Spark is <em>persisting</em> (or <em>caching</em>) a dataset in memory |
| across operations. When you persist an RDD, each node stores any partitions of it that it computes in |
| memory and reuses them in other actions on that dataset (or datasets derived from it). This allows |
| future actions to be much faster (often by more than 10x). Caching is a key tool for |
| iterative algorithms and fast interactive use.</p> |
| |
| <p>You can mark an RDD to be persisted using the <code class="language-plaintext highlighter-rouge">persist()</code> or <code class="language-plaintext highlighter-rouge">cache()</code> methods on it. The first time |
| it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – |
| if any partition of an RDD is lost, it will automatically be recomputed using the transformations |
| that originally created it.</p> |
| |
| <p>In addition, each persisted RDD can be stored using a different <em>storage level</em>, allowing you, for example, |
| to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), |
| replicate it across nodes. |
| These levels are set by passing a |
| <code class="language-plaintext highlighter-rouge">StorageLevel</code> object (<a href="api/scala/org/apache/spark/storage/StorageLevel.html">Scala</a>, |
| <a href="api/java/index.html?org/apache/spark/storage/StorageLevel.html">Java</a>, |
| <a href="api/python/reference/api/pyspark.StorageLevel.html#pyspark.StorageLevel">Python</a>) |
| to <code class="language-plaintext highlighter-rouge">persist()</code>. The <code class="language-plaintext highlighter-rouge">cache()</code> method is a shorthand for using the default storage level, |
| which is <code class="language-plaintext highlighter-rouge">StorageLevel.MEMORY_ONLY</code> (store deserialized objects in memory). The full set of |
| storage levels is:</p> |
| |
| <table class="table table-striped"> |
| <thead><tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr></thead> |
| <tr> |
| <td> MEMORY_ONLY </td> |
| <td> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will |
| not be cached and will be recomputed on the fly each time they're needed. This is the default level. </td> |
| </tr> |
| <tr> |
| <td> MEMORY_AND_DISK </td> |
| <td> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the |
| partitions that don't fit on disk, and read them from there when they're needed. </td> |
| </tr> |
| <tr> |
| <td> MEMORY_ONLY_SER <br /> (Java and Scala) </td> |
| <td> Store RDD as <i>serialized</i> Java objects (one byte array per partition). |
| This is generally more space-efficient than deserialized objects, especially when using a |
| <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read. |
| </td> |
| </tr> |
| <tr> |
| <td> MEMORY_AND_DISK_SER <br /> (Java and Scala) </td> |
| <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of |
| recomputing them on the fly each time they're needed. </td> |
| </tr> |
| <tr> |
| <td> DISK_ONLY </td> |
| <td> Store the RDD partitions only on disk. </td> |
| </tr> |
| <tr> |
| <td> MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. </td> |
| <td> Same as the levels above, but replicate each partition on two cluster nodes. </td> |
| </tr> |
| <tr> |
| <td> OFF_HEAP (experimental) </td> |
| <td> Similar to MEMORY_ONLY_SER, but store the data in |
| <a href="configuration.html#memory-management">off-heap memory</a>. This requires off-heap memory to be enabled. </td> |
| </tr> |
| </table> |
| |
| <p><strong>Note:</strong> <em>In Python, stored objects will always be serialized with the <a href="https://docs.python.org/3/library/pickle.html">Pickle</a> library, |
| so it does not matter whether you choose a serialized level. The available storage levels in Python include <code class="language-plaintext highlighter-rouge">MEMORY_ONLY</code>, <code class="language-plaintext highlighter-rouge">MEMORY_ONLY_2</code>, |
| <code class="language-plaintext highlighter-rouge">MEMORY_AND_DISK</code>, <code class="language-plaintext highlighter-rouge">MEMORY_AND_DISK_2</code>, <code class="language-plaintext highlighter-rouge">DISK_ONLY</code>, <code class="language-plaintext highlighter-rouge">DISK_ONLY_2</code>, and <code class="language-plaintext highlighter-rouge">DISK_ONLY_3</code>.</em></p> |
| |
| <p>Spark also automatically persists some intermediate data in shuffle operations (e.g. <code class="language-plaintext highlighter-rouge">reduceByKey</code>), even without users calling <code class="language-plaintext highlighter-rouge">persist</code>. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call <code class="language-plaintext highlighter-rouge">persist</code> on the resulting RDD if they plan to reuse it.</p> |
| |
| <h3 id="which-storage-level-to-choose">Which Storage Level to Choose?</h3> |
| |
| <p>Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU |
| efficiency. We recommend going through the following process to select one:</p> |
| |
| <ul> |
| <li> |
| <p>If your RDDs fit comfortably with the default storage level (<code class="language-plaintext highlighter-rouge">MEMORY_ONLY</code>), leave them that way. |
| This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.</p> |
| </li> |
| <li> |
| <p>If not, try using <code class="language-plaintext highlighter-rouge">MEMORY_ONLY_SER</code> and <a href="tuning.html">selecting a fast serialization library</a> to |
| make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)</p> |
| </li> |
| <li> |
| <p>Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter |
| a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from |
| disk.</p> |
| </li> |
| <li> |
| <p>Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve |
| requests from a web application). <em>All</em> the storage levels provide full fault tolerance by |
| recomputing lost data, but the replicated ones let you continue running tasks on the RDD without |
| waiting to recompute a lost partition.</p> |
| </li> |
| </ul> |
| |
| <h3 id="removing-data">Removing Data</h3> |
| |
| <p>Spark automatically monitors cache usage on each node and drops out old data partitions in a |
| least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for |
| it to fall out of the cache, use the <code class="language-plaintext highlighter-rouge">RDD.unpersist()</code> method. Note that this method does not |
| block by default. To block until resources are freed, specify <code class="language-plaintext highlighter-rouge">blocking=true</code> when calling this method.</p> |
| |
| <h1 id="shared-variables">Shared Variables</h1> |
| |
| <p>Normally, when a function passed to a Spark operation (such as <code class="language-plaintext highlighter-rouge">map</code> or <code class="language-plaintext highlighter-rouge">reduce</code>) is executed on a |
| remote cluster node, it works on separate copies of all the variables used in the function. These |
| variables are copied to each machine, and no updates to the variables on the remote machine are |
| propagated back to the driver program. Supporting general, read-write shared variables across tasks |
| would be inefficient. However, Spark does provide two limited types of <em>shared variables</em> for two |
| common usage patterns: broadcast variables and accumulators.</p> |
| |
| <h2 id="broadcast-variables">Broadcast Variables</h2> |
| |
| <p>Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather |
| than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a |
| large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables |
| using efficient broadcast algorithms to reduce communication cost.</p> |
| |
| <p>Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. |
| Spark automatically broadcasts the common data needed by tasks within each stage. The data |
| broadcasted this way is cached in serialized form and deserialized before running each task. This |
| means that explicitly creating broadcast variables is only useful when tasks across multiple stages |
| need the same data or when caching the data in deserialized form is important.</p> |
| |
| <p>Broadcast variables are created from a variable <code class="language-plaintext highlighter-rouge">v</code> by calling <code class="language-plaintext highlighter-rouge">SparkContext.broadcast(v)</code>. The |
| broadcast variable is a wrapper around <code class="language-plaintext highlighter-rouge">v</code>, and its value can be accessed by calling the <code class="language-plaintext highlighter-rouge">value</code> |
| method. The code below shows this:</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">>>></span> <span class="n">broadcastVar</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">broadcast</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">])</span> |
| <span class="o"><</span><span class="n">pyspark</span><span class="p">.</span><span class="n">broadcast</span><span class="p">.</span><span class="n">Broadcast</span> <span class="nb">object</span> <span class="n">at</span> <span class="mh">0x102789f10</span><span class="o">></span> |
| |
| <span class="o">>>></span> <span class="n">broadcastVar</span><span class="p">.</span><span class="n">value</span> |
| <span class="p">[</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">]</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">scala</span><span class="o">></span> <span class="k">val</span> <span class="nv">broadcastVar</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">broadcast</span><span class="o">(</span><span class="nc">Array</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">))</span> |
| <span class="n">broadcastVar</span><span class="k">:</span> <span class="kt">org.apache.spark.broadcast.Broadcast</span><span class="o">[</span><span class="kt">Array</span><span class="o">[</span><span class="kt">Int</span><span class="o">]]</span> <span class="k">=</span> <span class="nc">Broadcast</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| |
| <span class="n">scala</span><span class="o">></span> <span class="nv">broadcastVar</span><span class="o">.</span><span class="py">value</span> |
| <span class="n">res0</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">)</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">Broadcast</span><span class="o"><</span><span class="kt">int</span><span class="o">[]></span> <span class="n">broadcastVar</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">broadcast</span><span class="o">(</span><span class="k">new</span> <span class="kt">int</span><span class="o">[]</span> <span class="o">{</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">});</span> |
| |
| <span class="n">broadcastVar</span><span class="o">.</span><span class="na">value</span><span class="o">();</span> |
| <span class="c1">// returns [1, 2, 3]</span></code></pre></figure> |
| |
| </div> |
| |
| </div> |
| |
| <p>After the broadcast variable is created, it should be used instead of the value <code class="language-plaintext highlighter-rouge">v</code> in any functions |
| run on the cluster so that <code class="language-plaintext highlighter-rouge">v</code> is not shipped to the nodes more than once. In addition, the object |
| <code class="language-plaintext highlighter-rouge">v</code> should not be modified after it is broadcast in order to ensure that all nodes get the same |
| value of the broadcast variable (e.g. if the variable is shipped to a new node later).</p> |
| |
| <p>To release the resources that the broadcast variable copied onto executors, call <code class="language-plaintext highlighter-rouge">.unpersist()</code>. |
| If the broadcast is used again afterwards, it will be re-broadcast. To permanently release all |
| resources used by the broadcast variable, call <code class="language-plaintext highlighter-rouge">.destroy()</code>. The broadcast variable can’t be used |
| after that. Note that these methods do not block by default. To block until resources are freed, |
| specify <code class="language-plaintext highlighter-rouge">blocking=true</code> when calling them.</p> |
| |
| <h2 id="accumulators">Accumulators</h2> |
| |
| <p>Accumulators are variables that are only “added” to through an associative and commutative operation and can |
| therefore be efficiently supported in parallel. They can be used to implement counters (as in |
| MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers |
| can add support for new types.</p> |
| |
| <p>As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance <code class="language-plaintext highlighter-rouge">counter</code>) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table.</p> |
| |
| <p style="text-align: center;"> |
| <img src="img/spark-webui-accumulators.png" title="Accumulators in the Spark UI" alt="Accumulators in the Spark UI" /> |
| </p> |
| |
| <p>Tracking accumulators in the UI can be useful for understanding the progress of |
| running stages (NOTE: this is not yet supported in Python).</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <p>An accumulator is created from an initial value <code class="language-plaintext highlighter-rouge">v</code> by calling <code class="language-plaintext highlighter-rouge">SparkContext.accumulator(v)</code>. Tasks |
| running on a cluster can then add to it using the <code class="language-plaintext highlighter-rouge">add</code> method or the <code class="language-plaintext highlighter-rouge">+=</code> operator. However, they cannot read its value. |
| Only the driver program can read the accumulator’s value, using its <code class="language-plaintext highlighter-rouge">value</code> method.</p> |
| |
| <p>The code below shows an accumulator being used to add up the elements of an array:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">>>></span> <span class="n">accum</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">accumulator</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> |
| <span class="o">>>></span> <span class="n">accum</span> |
| <span class="n">Accumulator</span><span class="o"><</span><span class="nb">id</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="mi">0</span><span class="o">></span> |
| |
| <span class="o">>>></span> <span class="n">sc</span><span class="p">.</span><span class="n">parallelize</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">,</span> <span class="mi">4</span><span class="p">]).</span><span class="n">foreach</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">accum</span><span class="p">.</span><span class="n">add</span><span class="p">(</span><span class="n">x</span><span class="p">))</span> |
| <span class="p">...</span> |
| <span class="mi">10</span><span class="o">/</span><span class="mi">09</span><span class="o">/</span><span class="mi">29</span> <span class="mi">18</span><span class="p">:</span><span class="mi">41</span><span class="p">:</span><span class="mi">08</span> <span class="n">INFO</span> <span class="n">SparkContext</span><span class="p">:</span> <span class="n">Tasks</span> <span class="n">finished</span> <span class="ow">in</span> <span class="mf">0.317106</span> <span class="n">s</span> |
| |
| <span class="o">>>></span> <span class="n">accum</span><span class="p">.</span><span class="n">value</span> |
| <span class="mi">10</span></code></pre></figure> |
| |
| <p>While this code used the built-in support for accumulators of type Int, programmers can also |
| create their own types by subclassing <a href="api/python/reference/api/pyspark.AccumulatorParam.html#pyspark.AccumulatorParam">AccumulatorParam</a>. |
| The AccumulatorParam interface has two methods: <code class="language-plaintext highlighter-rouge">zero</code> for providing a “zero value” for your data |
| type, and <code class="language-plaintext highlighter-rouge">addInPlace</code> for adding two values together. For example, supposing we had a <code class="language-plaintext highlighter-rouge">Vector</code> class |
| representing mathematical vectors, we could write:</p> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">VectorAccumulatorParam</span><span class="p">(</span><span class="n">AccumulatorParam</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">zero</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">initialValue</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">Vector</span><span class="p">.</span><span class="n">zeros</span><span class="p">(</span><span class="n">initialValue</span><span class="p">.</span><span class="n">size</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">addInPlace</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">v1</span><span class="p">,</span> <span class="n">v2</span><span class="p">):</span> |
| <span class="n">v1</span> <span class="o">+=</span> <span class="n">v2</span> |
| <span class="k">return</span> <span class="n">v1</span> |
| |
| <span class="c1"># Then, create an Accumulator of this type: |
| </span><span class="n">vecAccum</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">accumulator</span><span class="p">(</span><span class="n">Vector</span><span class="p">(...),</span> <span class="n">VectorAccumulatorParam</span><span class="p">())</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <p>A numeric accumulator can be created by calling <code class="language-plaintext highlighter-rouge">SparkContext.longAccumulator()</code> or <code class="language-plaintext highlighter-rouge">SparkContext.doubleAccumulator()</code> |
| to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using |
| the <code class="language-plaintext highlighter-rouge">add</code> method. However, they cannot read its value. Only the driver program can read the accumulator’s value, |
| using its <code class="language-plaintext highlighter-rouge">value</code> method.</p> |
| |
| <p>The code below shows an accumulator being used to add up the elements of an array:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">scala</span><span class="o">></span> <span class="k">val</span> <span class="nv">accum</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">longAccumulator</span><span class="o">(</span><span class="s">"My Accumulator"</span><span class="o">)</span> |
| <span class="n">accum</span><span class="k">:</span> <span class="kt">org.apache.spark.util.LongAccumulator</span> <span class="o">=</span> <span class="nc">LongAccumulator</span><span class="o">(</span><span class="n">id</span><span class="k">:</span> <span class="err">0</span><span class="o">,</span> <span class="n">name</span><span class="k">:</span> <span class="kt">Some</span><span class="o">(</span><span class="kt">My</span> <span class="kt">Accumulator</span><span class="o">),</span> <span class="n">value</span><span class="k">:</span> <span class="err">0</span><span class="o">)</span> |
| |
| <span class="n">scala</span><span class="o">></span> <span class="nv">sc</span><span class="o">.</span><span class="py">parallelize</span><span class="o">(</span><span class="nc">Array</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">)).</span><span class="py">foreach</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nv">accum</span><span class="o">.</span><span class="py">add</span><span class="o">(</span><span class="n">x</span><span class="o">))</span> |
| <span class="o">...</span> |
| <span class="mi">10</span><span class="o">/</span><span class="mi">09</span><span class="o">/</span><span class="mi">29</span> <span class="mi">18</span><span class="k">:</span><span class="err">41</span><span class="kt">:</span><span class="err">08</span> <span class="kt">INFO</span> <span class="kt">SparkContext:</span> <span class="kt">Tasks</span> <span class="kt">finished</span> <span class="kt">in</span> <span class="err">0</span><span class="kt">.</span><span class="err">317106</span> <span class="kt">s</span> |
| |
| <span class="n">scala</span><span class="o">></span> <span class="nv">accum</span><span class="o">.</span><span class="py">value</span> |
| <span class="n">res2</span><span class="k">:</span> <span class="kt">Long</span> <span class="o">=</span> <span class="mi">10</span></code></pre></figure> |
| |
| <p>While this code used the built-in support for accumulators of type Long, programmers can also |
| create their own types by subclassing <a href="api/scala/org/apache/spark/util/AccumulatorV2.html">AccumulatorV2</a>. |
| The AccumulatorV2 abstract class has several methods which one has to override: <code class="language-plaintext highlighter-rouge">reset</code> for resetting |
| the accumulator to zero, <code class="language-plaintext highlighter-rouge">add</code> for adding another value into the accumulator, |
| <code class="language-plaintext highlighter-rouge">merge</code> for merging another same-type accumulator into this one. Other methods that must be overridden |
| are contained in the <a href="api/scala/org/apache/spark/util/AccumulatorV2.html">API documentation</a>. For example, supposing we had a <code class="language-plaintext highlighter-rouge">MyVector</code> class |
| representing mathematical vectors, we could write:</p> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">VectorAccumulatorV2</span> <span class="k">extends</span> <span class="nc">AccumulatorV2</span><span class="o">[</span><span class="kt">MyVector</span>, <span class="kt">MyVector</span><span class="o">]</span> <span class="o">{</span> |
| |
| <span class="k">private</span> <span class="k">val</span> <span class="nv">myVector</span><span class="k">:</span> <span class="kt">MyVector</span> <span class="o">=</span> <span class="nv">MyVector</span><span class="o">.</span><span class="py">createZeroVector</span> |
| |
| <span class="k">def</span> <span class="nf">reset</span><span class="o">()</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="nv">myVector</span><span class="o">.</span><span class="py">reset</span><span class="o">()</span> |
| <span class="o">}</span> |
| |
| <span class="k">def</span> <span class="nf">add</span><span class="o">(</span><span class="n">v</span><span class="k">:</span> <span class="kt">MyVector</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="nv">myVector</span><span class="o">.</span><span class="py">add</span><span class="o">(</span><span class="n">v</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// Then, create an Accumulator of this type:</span> |
| <span class="k">val</span> <span class="nv">myVectorAcc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">VectorAccumulatorV2</span> |
| <span class="c1">// Then, register it into spark context:</span> |
| <span class="nv">sc</span><span class="o">.</span><span class="py">register</span><span class="o">(</span><span class="n">myVectorAcc</span><span class="o">,</span> <span class="s">"MyVectorAcc1"</span><span class="o">)</span></code></pre></figure> |
| |
| <p>Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>A numeric accumulator can be created by calling <code class="language-plaintext highlighter-rouge">SparkContext.longAccumulator()</code> or <code class="language-plaintext highlighter-rouge">SparkContext.doubleAccumulator()</code> |
| to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using |
| the <code class="language-plaintext highlighter-rouge">add</code> method. However, they cannot read its value. Only the driver program can read the accumulator’s value, |
| using its <code class="language-plaintext highlighter-rouge">value</code> method.</p> |
| |
| <p>The code below shows an accumulator being used to add up the elements of an array:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">LongAccumulator</span> <span class="n">accum</span> <span class="o">=</span> <span class="n">jsc</span><span class="o">.</span><span class="na">sc</span><span class="o">().</span><span class="na">longAccumulator</span><span class="o">();</span> |
| |
| <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">)).</span><span class="na">foreach</span><span class="o">(</span><span class="n">x</span> <span class="o">-></span> <span class="n">accum</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">x</span><span class="o">));</span> |
| <span class="c1">// ...</span> |
| <span class="c1">// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s</span> |
| |
| <span class="n">accum</span><span class="o">.</span><span class="na">value</span><span class="o">();</span> |
| <span class="c1">// returns 10</span></code></pre></figure> |
| |
| <p>While this code used the built-in support for accumulators of type Long, programmers can also |
| create their own types by subclassing <a href="api/scala/org/apache/spark/util/AccumulatorV2.html">AccumulatorV2</a>. |
| The AccumulatorV2 abstract class has several methods which one has to override: <code class="language-plaintext highlighter-rouge">reset</code> for resetting |
| the accumulator to zero, <code class="language-plaintext highlighter-rouge">add</code> for adding another value into the accumulator, |
| <code class="language-plaintext highlighter-rouge">merge</code> for merging another same-type accumulator into this one. Other methods that must be overridden |
| are contained in the <a href="api/scala/org/apache/spark/util/AccumulatorV2.html">API documentation</a>. For example, supposing we had a <code class="language-plaintext highlighter-rouge">MyVector</code> class |
| representing mathematical vectors, we could write:</p> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">VectorAccumulatorV2</span> <span class="kd">implements</span> <span class="nc">AccumulatorV2</span><span class="o"><</span><span class="nc">MyVector</span><span class="o">,</span> <span class="nc">MyVector</span><span class="o">></span> <span class="o">{</span> |
| |
| <span class="kd">private</span> <span class="nc">MyVector</span> <span class="n">myVector</span> <span class="o">=</span> <span class="nc">MyVector</span><span class="o">.</span><span class="na">createZeroVector</span><span class="o">();</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reset</span><span class="o">()</span> <span class="o">{</span> |
| <span class="n">myVector</span><span class="o">.</span><span class="na">reset</span><span class="o">();</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">add</span><span class="o">(</span><span class="nc">MyVector</span> <span class="n">v</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">myVector</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">v</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// Then, create an Accumulator of this type:</span> |
| <span class="nc">VectorAccumulatorV2</span> <span class="n">myVectorAcc</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">VectorAccumulatorV2</span><span class="o">();</span> |
| <span class="c1">// Then, register it into spark context:</span> |
| <span class="n">jsc</span><span class="o">.</span><span class="na">sc</span><span class="o">().</span><span class="na">register</span><span class="o">(</span><span class="n">myVectorAcc</span><span class="o">,</span> <span class="s">"MyVectorAcc1"</span><span class="o">);</span></code></pre></figure> |
| |
| <p>Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.</p> |
| |
| <p><em>Warning</em>: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator. |
| If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence, |
| a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful.</p> |
| |
| </div> |
| |
| </div> |
| |
| <p>For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task’s update to the accumulator |
| will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware |
| of that each task’s update may be applied more than once if tasks or job stages are re-executed.</p> |
| |
| <p>Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like <code class="language-plaintext highlighter-rouge">map()</code>. The below code fragment demonstrates this property:</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">accum</span> <span class="o">=</span> <span class="n">sc</span><span class="p">.</span><span class="n">accumulator</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span> |
| <span class="k">def</span> <span class="nf">g</span><span class="p">(</span><span class="n">x</span><span class="p">):</span> |
| <span class="n">accum</span><span class="p">.</span><span class="n">add</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">f</span><span class="p">(</span><span class="n">x</span><span class="p">)</span> |
| <span class="n">data</span><span class="p">.</span><span class="nb">map</span><span class="p">(</span><span class="n">g</span><span class="p">)</span> |
| <span class="c1"># Here, accum is still 0 because no actions have caused the `map` to be computed.</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">accum</span> <span class="k">=</span> <span class="nv">sc</span><span class="o">.</span><span class="py">longAccumulator</span> |
| <span class="nv">data</span><span class="o">.</span><span class="py">map</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=></span> <span class="nv">accum</span><span class="o">.</span><span class="py">add</span><span class="o">(</span><span class="n">x</span><span class="o">);</span> <span class="n">x</span> <span class="o">}</span> |
| <span class="c1">// Here, accum is still 0 because no actions have caused the map operation to be computed.</span></code></pre></figure> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">LongAccumulator</span> <span class="n">accum</span> <span class="o">=</span> <span class="n">jsc</span><span class="o">.</span><span class="na">sc</span><span class="o">().</span><span class="na">longAccumulator</span><span class="o">();</span> |
| <span class="n">data</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">x</span> <span class="o">-></span> <span class="o">{</span> <span class="n">accum</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">x</span><span class="o">);</span> <span class="k">return</span> <span class="n">f</span><span class="o">(</span><span class="n">x</span><span class="o">);</span> <span class="o">});</span> |
| <span class="c1">// Here, accum is still 0 because no actions have caused the `map` to be computed.</span></code></pre></figure> |
| |
| </div> |
| |
| </div> |
| |
| <h1 id="deploying-to-a-cluster">Deploying to a Cluster</h1> |
| |
| <p>The <a href="submitting-applications.html">application submission guide</a> describes how to submit applications to a cluster. |
| In short, once you package your application into a JAR (for Java/Scala) or a set of <code class="language-plaintext highlighter-rouge">.py</code> or <code class="language-plaintext highlighter-rouge">.zip</code> files (for Python), |
| the <code class="language-plaintext highlighter-rouge">bin/spark-submit</code> script lets you submit it to any supported cluster manager.</p> |
| |
| <h1 id="launching-spark-jobs-from-java--scala">Launching Spark jobs from Java / Scala</h1> |
| |
| <p>The <a href="api/java/index.html?org/apache/spark/launcher/package-summary.html">org.apache.spark.launcher</a> |
| package provides classes for launching Spark jobs as child processes using a simple Java API.</p> |
| |
| <h1 id="unit-testing">Unit Testing</h1> |
| |
| <p>Spark is friendly to unit testing with any popular unit test framework. |
| Simply create a <code class="language-plaintext highlighter-rouge">SparkContext</code> in your test with the master URL set to <code class="language-plaintext highlighter-rouge">local</code>, run your operations, |
| and then call <code class="language-plaintext highlighter-rouge">SparkContext.stop()</code> to tear it down. |
| Make sure you stop the context within a <code class="language-plaintext highlighter-rouge">finally</code> block or the test framework’s <code class="language-plaintext highlighter-rouge">tearDown</code> method, |
| as Spark does not support two contexts running concurrently in the same program.</p> |
| |
| <h1 id="where-to-go-from-here">Where to Go from Here</h1> |
| |
| <p>You can see some <a href="https://spark.apache.org/examples.html">example Spark programs</a> on the Spark website. |
| In addition, Spark includes several samples in the <code class="language-plaintext highlighter-rouge">examples</code> directory |
| (<a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples">Scala</a>, |
| <a href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples">Java</a>, |
| <a href="https://github.com/apache/spark/tree/master/examples/src/main/python">Python</a>, |
| <a href="https://github.com/apache/spark/tree/master/examples/src/main/r">R</a>). |
| You can run Java and Scala examples by passing the class name to Spark’s <code class="language-plaintext highlighter-rouge">bin/run-example</code> script; for instance:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/run-example SparkPi |
| </code></pre></div></div> |
| |
| <p>For Python examples, use <code class="language-plaintext highlighter-rouge">spark-submit</code> instead:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit examples/src/main/python/pi.py |
| </code></pre></div></div> |
| |
| <p>For R examples, use <code class="language-plaintext highlighter-rouge">spark-submit</code> instead:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit examples/src/main/r/dataframe.R |
| </code></pre></div></div> |
| |
| <p>For help on optimizing your programs, the <a href="configuration.html">configuration</a> and |
| <a href="tuning.html">tuning</a> guides provide information on best practices. They are especially important for |
| making sure that your data is stored in memory in an efficient format. |
| For help on deploying, the <a href="cluster-overview.html">cluster mode overview</a> describes the components involved |
| in distributed operation and supported cluster managers.</p> |
| |
| <p>Finally, full API documentation is available in |
| <a href="api/scala/org/apache/spark/">Scala</a>, <a href="api/java/">Java</a>, <a href="api/python/">Python</a> and <a href="api/R/">R</a>.</p> |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-3.5.1.min.js"></script> |
| <script src="js/vendor/bootstrap.bundle.min.js"></script> |
| |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <script type="text/javascript" src="js/vendor/docsearch.min.js"></script> |
| <script type="text/javascript"> |
| // DocSearch is entirely free and automated. DocSearch is built in two parts: |
| // 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link |
| // in your website and extract content from every page it traverses. It then pushes this |
| // content to an Algolia index. |
| // 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index |
| // to your search input and display its results in a dropdown UI. If you want to find more |
| // details on how works DocSearch, check the docs of DocSearch. |
| docsearch({ |
| apiKey: 'd62f962a82bc9abb53471cb7b89da35e', |
| appId: 'RAI69RXRSK', |
| indexName: 'apache_spark', |
| inputSelector: '#docsearch-input', |
| enhancedSearchInput: true, |
| algoliaOptions: { |
| 'facetFilters': ["version:3.5.0"] |
| }, |
| debug: false // Set debug to true if you want to inspect the dropdown |
| }); |
| |
| </script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |