blob: 4ea95935d438b2118f2fc18c3632972d81f7008f [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Getting Started - Spark 2.4.7 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<!-- Google analytics script -->
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-32518208-2']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<div class="navbar navbar-fixed-top" id="topbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">2.4.7</span>
</div>
<ul class="nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a></li>
<li><a href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a></li>
<li><a href="structured-streaming-programming-guide.html">Structured Streaming</a></li>
<li><a href="streaming-programming-guide.html">Spark Streaming (DStreams)</a></li>
<li><a href="ml-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="sparkr.html">SparkR (R on Spark)</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
<li><a href="api/java/index.html">Java</a></li>
<li><a href="api/python/index.html">Python</a></li>
<li><a href="api/R/index.html">R</a></li>
<li><a href="api/sql/index.html">SQL, Built-in Functions</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
<li><a href="running-on-kubernetes.html">Kubernetes</a></li>
</ul>
</li>
<li class="dropdown">
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li class="divider"></li>
<li><a href="building-spark.html">Building Spark</a></li>
<li><a href="https://spark.apache.org/contributing.html">Contributing to Spark</a></li>
<li><a href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a></li>
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v2.4.7</span></p>-->
</div>
</div>
</div>
<div class="container-wrapper">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3>
<ul>
<li>
<a href="sql-getting-started.html">
<b>Getting Started</b>
</a>
</li>
<ul>
<li>
<a href="sql-getting-started.html#starting-point-sparksession">
Starting Point: SparkSession
</a>
</li>
<li>
<a href="sql-getting-started.html#creating-dataframes">
Creating DataFrames
</a>
</li>
<li>
<a href="sql-getting-started.html#untyped-dataset-operations-aka-dataframe-operations">
Untyped Dataset Operations (DataFrame operations)
</a>
</li>
<li>
<a href="sql-getting-started.html#running-sql-queries-programmatically">
Running SQL Queries Programmatically
</a>
</li>
<li>
<a href="sql-getting-started.html#global-temporary-view">
Global Temporary View
</a>
</li>
<li>
<a href="sql-getting-started.html#creating-datasets">
Creating Datasets
</a>
</li>
<li>
<a href="sql-getting-started.html#interoperating-with-rdds">
Interoperating with RDDs
</a>
</li>
<li>
<a href="sql-getting-started.html#aggregations">
Aggregations
</a>
</li>
</ul>
<li>
<a href="sql-data-sources.html">
Data Sources
</a>
</li>
<li>
<a href="sql-performance-tuning.html">
Performance Tuning
</a>
</li>
<li>
<a href="sql-distributed-sql-engine.html">
Distributed SQL Engine
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html">
PySpark Usage Guide for Pandas with Apache Arrow
</a>
</li>
<li>
<a href="sql-migration-guide.html">
Migration Guide
</a>
</li>
<li>
<a href="sql-reference.html">
Reference
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar" id="content">
<h1 class="title">Getting Started</h1>
<ul id="markdown-toc">
<li><a href="#starting-point-sparksession" id="markdown-toc-starting-point-sparksession">Starting Point: SparkSession</a></li>
<li><a href="#creating-dataframes" id="markdown-toc-creating-dataframes">Creating DataFrames</a></li>
<li><a href="#untyped-dataset-operations-aka-dataframe-operations" id="markdown-toc-untyped-dataset-operations-aka-dataframe-operations">Untyped Dataset Operations (aka DataFrame Operations)</a></li>
<li><a href="#running-sql-queries-programmatically" id="markdown-toc-running-sql-queries-programmatically">Running SQL Queries Programmatically</a></li>
<li><a href="#global-temporary-view" id="markdown-toc-global-temporary-view">Global Temporary View</a></li>
<li><a href="#creating-datasets" id="markdown-toc-creating-datasets">Creating Datasets</a></li>
<li><a href="#interoperating-with-rdds" id="markdown-toc-interoperating-with-rdds">Interoperating with RDDs</a> <ul>
<li><a href="#inferring-the-schema-using-reflection" id="markdown-toc-inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</a></li>
<li><a href="#programmatically-specifying-the-schema" id="markdown-toc-programmatically-specifying-the-schema">Programmatically Specifying the Schema</a></li>
</ul>
</li>
<li><a href="#aggregations" id="markdown-toc-aggregations">Aggregations</a> <ul>
<li><a href="#untyped-user-defined-aggregate-functions" id="markdown-toc-untyped-user-defined-aggregate-functions">Untyped User-Defined Aggregate Functions</a></li>
<li><a href="#type-safe-user-defined-aggregate-functions" id="markdown-toc-type-safe-user-defined-aggregate-functions">Type-Safe User-Defined Aggregate Functions</a></li>
</ul>
</li>
</ul>
<h2 id="starting-point-sparksession">Starting Point: SparkSession</h2>
<div class="codetabs">
<div data-lang="scala">
<p>The entry point into all functionality in Spark is the <a href="api/scala/index.html#org.apache.spark.sql.SparkSession"><code>SparkSession</code></a> class. To create a basic <code>SparkSession</code>, just use <code>SparkSession.builder()</code>:</p>
<div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span>
<span class="k">val</span> <span class="n">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span>
<span class="o">.</span><span class="n">builder</span><span class="o">()</span>
<span class="o">.</span><span class="n">appName</span><span class="o">(</span><span class="s">&quot;Spark SQL basic example&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="n">config</span><span class="o">(</span><span class="s">&quot;spark.some.config.option&quot;</span><span class="o">,</span> <span class="s">&quot;some-value&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="n">getOrCreate</span><span class="o">()</span>
<span class="c1">// For implicit conversions like converting RDDs to DataFrames</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<p>The entry point into all functionality in Spark is the <a href="api/java/index.html#org.apache.spark.sql.SparkSession"><code>SparkSession</code></a> class. To create a basic <code>SparkSession</code>, just use <code>SparkSession.builder()</code>:</p>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span><span class="o">;</span>
<span class="n">SparkSession</span> <span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>
<span class="o">.</span><span class="na">builder</span><span class="o">()</span>
<span class="o">.</span><span class="na">appName</span><span class="o">(</span><span class="s">&quot;Java Spark SQL basic example&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">config</span><span class="o">(</span><span class="s">&quot;spark.some.config.option&quot;</span><span class="o">,</span> <span class="s">&quot;some-value&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">getOrCreate</span><span class="o">();</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<p>The entry point into all functionality in Spark is the <a href="api/python/pyspark.sql.html#pyspark.sql.SparkSession"><code>SparkSession</code></a> class. To create a basic <code>SparkSession</code>, just use <code>SparkSession.builder</code>:</p>
<div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span> \
<span class="o">.</span><span class="n">builder</span> \
<span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;Python Spark SQL basic example&quot;</span><span class="p">)</span> \
<span class="o">.</span><span class="n">config</span><span class="p">(</span><span class="s2">&quot;spark.some.config.option&quot;</span><span class="p">,</span> <span class="s2">&quot;some-value&quot;</span><span class="p">)</span> \
<span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
</div>
<div data-lang="r">
<p>The entry point into all functionality in Spark is the <a href="api/R/sparkR.session.html"><code>SparkSession</code></a> class. To initialize a basic <code>SparkSession</code>, just call <code>sparkR.session()</code>:</p>
<div class="highlight"><pre><span></span>sparkR.session<span class="p">(</span>appName <span class="o">=</span> <span class="s">&quot;R Spark SQL basic example&quot;</span><span class="p">,</span> sparkConfig <span class="o">=</span> <span class="kt">list</span><span class="p">(</span>spark.some.config.option <span class="o">=</span> <span class="s">&quot;some-value&quot;</span><span class="p">))</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div>
<p>Note that when invoked for the first time, <code>sparkR.session()</code> initializes a global <code>SparkSession</code> singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the <code>SparkSession</code> once, then SparkR functions like <code>read.df</code> will be able to access this global instance implicitly, and users don&#8217;t need to pass the <code>SparkSession</code> instance around.</p>
</div>
</div>
<p><code>SparkSession</code> in Spark 2.0 provides builtin support for Hive features including the ability to
write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables.
To use these features, you do not need to have an existing Hive setup.</p>
<h2 id="creating-dataframes">Creating DataFrames</h2>
<div class="codetabs">
<div data-lang="scala">
<p>With a <code>SparkSession</code>, applications can create DataFrames from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>,
from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p>
<p>As an example, the following creates a DataFrame based on the content of a JSON file:</p>
<div class="highlight"><pre><span></span><span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="o">)</span>
<span class="c1">// Displays the content of the DataFrame to stdout</span>
<span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<p>With a <code>SparkSession</code>, applications can create DataFrames from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>,
from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p>
<p>As an example, the following creates a DataFrame based on the content of a JSON file:</p>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="o">);</span>
<span class="c1">// Displays the content of the DataFrame to stdout</span>
<span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<p>With a <code>SparkSession</code>, applications can create DataFrames from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>,
from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p>
<p>As an example, the following creates a DataFrame based on the content of a JSON file:</p>
<div class="highlight"><pre><span></span><span class="c1"># spark is an existing SparkSession</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="s2">&quot;examples/src/main/resources/people.json&quot;</span><span class="p">)</span>
<span class="c1"># Displays the content of the DataFrame to stdout</span>
<span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># | age| name|</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># |null|Michael|</span>
<span class="c1"># | 30| Andy|</span>
<span class="c1"># | 19| Justin|</span>
<span class="c1"># +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
</div>
<div data-lang="r">
<p>With a <code>SparkSession</code>, applications can create DataFrames from a local R data.frame,
from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p>
<p>As an example, the following creates a DataFrame based on the content of a JSON file:</p>
<div class="highlight"><pre><span></span>df <span class="o">&lt;-</span> read.json<span class="p">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="p">)</span>
<span class="c1"># Displays the content of the DataFrame</span>
<span class="kp">head</span><span class="p">(</span>df<span class="p">)</span>
<span class="c1">## age name</span>
<span class="c1">## 1 NA Michael</span>
<span class="c1">## 2 30 Andy</span>
<span class="c1">## 3 19 Justin</span>
<span class="c1"># Another method to print the first few rows and optionally truncate the printing of long values</span>
showDF<span class="p">(</span>df<span class="p">)</span>
<span class="c1">## +----+-------+</span>
<span class="c1">## | age| name|</span>
<span class="c1">## +----+-------+</span>
<span class="c1">## |null|Michael|</span>
<span class="c1">## | 30| Andy|</span>
<span class="c1">## | 19| Justin|</span>
<span class="c1">## +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div>
</div>
</div>
<h2 id="untyped-dataset-operations-aka-dataframe-operations">Untyped Dataset Operations (aka DataFrame Operations)</h2>
<p>DataFrames provide a domain-specific language for structured data manipulation in <a href="api/scala/index.html#org.apache.spark.sql.Dataset">Scala</a>, <a href="api/java/index.html?org/apache/spark/sql/Dataset.html">Java</a>, <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">Python</a> and <a href="api/R/SparkDataFrame.html">R</a>.</p>
<p>As mentioned above, in Spark 2.0, DataFrames are just Dataset of <code>Row</code>s in Scala and Java API. These operations are also referred as &#8220;untyped transformations&#8221; in contrast to &#8220;typed transformations&#8221; come with strongly typed Scala/Java Datasets.</p>
<p>Here we include some basic examples of structured data processing using Datasets:</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="c1">// This import is needed to use the $-notation</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
<span class="c1">// Print the schema in a tree format</span>
<span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span>
<span class="c1">// root</span>
<span class="c1">// |-- age: long (nullable = true)</span>
<span class="c1">// |-- name: string (nullable = true)</span>
<span class="c1">// Select only the &quot;name&quot; column</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +-------+</span>
<span class="c1">// | name|</span>
<span class="c1">// +-------+</span>
<span class="c1">// |Michael|</span>
<span class="c1">// | Andy|</span>
<span class="c1">// | Justin|</span>
<span class="c1">// +-------+</span>
<span class="c1">// Select everybody, but increment the age by 1</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="n">$</span><span class="s">&quot;name&quot;</span><span class="o">,</span> <span class="n">$</span><span class="s">&quot;age&quot;</span> <span class="o">+</span> <span class="mi">1</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +-------+---------+</span>
<span class="c1">// | name|(age + 1)|</span>
<span class="c1">// +-------+---------+</span>
<span class="c1">// |Michael| null|</span>
<span class="c1">// | Andy| 31|</span>
<span class="c1">// | Justin| 20|</span>
<span class="c1">// +-------+---------+</span>
<span class="c1">// Select people older than 21</span>
<span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">$</span><span class="s">&quot;age&quot;</span> <span class="o">&gt;</span> <span class="mi">21</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +---+----+</span>
<span class="c1">// |age|name|</span>
<span class="c1">// +---+----+</span>
<span class="c1">// | 30|Andy|</span>
<span class="c1">// +---+----+</span>
<span class="c1">// Count people by age</span>
<span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;age&quot;</span><span class="o">).</span><span class="n">count</span><span class="o">().</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+-----+</span>
<span class="c1">// | age|count|</span>
<span class="c1">// +----+-----+</span>
<span class="c1">// | 19| 1|</span>
<span class="c1">// |null| 1|</span>
<span class="c1">// | 30| 1|</span>
<span class="c1">// +----+-----+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
<p>For a complete list of the types of operations that can be performed on a Dataset refer to the <a href="api/scala/index.html#org.apache.spark.sql.Dataset">API Documentation</a>.</p>
<p>In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/scala/index.html#org.apache.spark.sql.functions$">DataFrame Function Reference</a>.</p>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="c1">// col(&quot;...&quot;) is preferable to df.col(&quot;...&quot;)</span>
<span class="kn">import static</span> <span class="nn">org.apache.spark.sql.functions.col</span><span class="o">;</span>
<span class="c1">// Print the schema in a tree format</span>
<span class="n">df</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span>
<span class="c1">// root</span>
<span class="c1">// |-- age: long (nullable = true)</span>
<span class="c1">// |-- name: string (nullable = true)</span>
<span class="c1">// Select only the &quot;name&quot; column</span>
<span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">).</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +-------+</span>
<span class="c1">// | name|</span>
<span class="c1">// +-------+</span>
<span class="c1">// |Michael|</span>
<span class="c1">// | Andy|</span>
<span class="c1">// | Justin|</span>
<span class="c1">// +-------+</span>
<span class="c1">// Select everybody, but increment the age by 1</span>
<span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">),</span> <span class="n">col</span><span class="o">(</span><span class="s">&quot;age&quot;</span><span class="o">).</span><span class="na">plus</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +-------+---------+</span>
<span class="c1">// | name|(age + 1)|</span>
<span class="c1">// +-------+---------+</span>
<span class="c1">// |Michael| null|</span>
<span class="c1">// | Andy| 31|</span>
<span class="c1">// | Justin| 20|</span>
<span class="c1">// +-------+---------+</span>
<span class="c1">// Select people older than 21</span>
<span class="n">df</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">&quot;age&quot;</span><span class="o">).</span><span class="na">gt</span><span class="o">(</span><span class="mi">21</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +---+----+</span>
<span class="c1">// |age|name|</span>
<span class="c1">// +---+----+</span>
<span class="c1">// | 30|Andy|</span>
<span class="c1">// +---+----+</span>
<span class="c1">// Count people by age</span>
<span class="n">df</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;age&quot;</span><span class="o">).</span><span class="na">count</span><span class="o">().</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +----+-----+</span>
<span class="c1">// | age|count|</span>
<span class="c1">// +----+-----+</span>
<span class="c1">// | 19| 1|</span>
<span class="c1">// |null| 1|</span>
<span class="c1">// | 30| 1|</span>
<span class="c1">// +----+-----+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
<p>For a complete list of the types of operations that can be performed on a Dataset refer to the <a href="api/java/org/apache/spark/sql/Dataset.html">API Documentation</a>.</p>
<p>In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/java/org/apache/spark/sql/functions.html">DataFrame Function Reference</a>.</p>
</div>
<div data-lang="python">
<p>In Python, it&#8217;s possible to access a DataFrame&#8217;s columns either by attribute
(<code>df.age</code>) or by indexing (<code>df['age']</code>). While the former is convenient for
interactive data exploration, users are highly encouraged to use the
latter form, which is future proof and won&#8217;t break with column names that
are also attributes on the DataFrame class.</p>
<div class="highlight"><pre><span></span><span class="c1"># spark, df are from the previous example</span>
<span class="c1"># Print the schema in a tree format</span>
<span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># root</span>
<span class="c1"># |-- age: long (nullable = true)</span>
<span class="c1"># |-- name: string (nullable = true)</span>
<span class="c1"># Select only the &quot;name&quot; column</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s2">&quot;name&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-------+</span>
<span class="c1"># | name|</span>
<span class="c1"># +-------+</span>
<span class="c1"># |Michael|</span>
<span class="c1"># | Andy|</span>
<span class="c1"># | Justin|</span>
<span class="c1"># +-------+</span>
<span class="c1"># Select everybody, but increment the age by 1</span>
<span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">&#39;name&#39;</span><span class="p">],</span> <span class="n">df</span><span class="p">[</span><span class="s1">&#39;age&#39;</span><span class="p">]</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-------+---------+</span>
<span class="c1"># | name|(age + 1)|</span>
<span class="c1"># +-------+---------+</span>
<span class="c1"># |Michael| null|</span>
<span class="c1"># | Andy| 31|</span>
<span class="c1"># | Justin| 20|</span>
<span class="c1"># +-------+---------+</span>
<span class="c1"># Select people older than 21</span>
<span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">&#39;age&#39;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="mi">21</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +---+----+</span>
<span class="c1"># |age|name|</span>
<span class="c1"># +---+----+</span>
<span class="c1"># | 30|Andy|</span>
<span class="c1"># +---+----+</span>
<span class="c1"># Count people by age</span>
<span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="p">(</span><span class="s2">&quot;age&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----+-----+</span>
<span class="c1"># | age|count|</span>
<span class="c1"># +----+-----+</span>
<span class="c1"># | 19| 1|</span>
<span class="c1"># |null| 1|</span>
<span class="c1"># | 30| 1|</span>
<span class="c1"># +----+-----+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
<p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">API Documentation</a>.</p>
<p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/python/pyspark.sql.html#module-pyspark.sql.functions">DataFrame Function Reference</a>.</p>
</div>
<div data-lang="r">
<div class="highlight"><pre><span></span><span class="c1"># Create the DataFrame</span>
df <span class="o">&lt;-</span> read.json<span class="p">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="p">)</span>
<span class="c1"># Show the content of the DataFrame</span>
<span class="kp">head</span><span class="p">(</span>df<span class="p">)</span>
<span class="c1">## age name</span>
<span class="c1">## 1 NA Michael</span>
<span class="c1">## 2 30 Andy</span>
<span class="c1">## 3 19 Justin</span>
<span class="c1"># Print the schema in a tree format</span>
printSchema<span class="p">(</span>df<span class="p">)</span>
<span class="c1">## root</span>
<span class="c1">## |-- age: long (nullable = true)</span>
<span class="c1">## |-- name: string (nullable = true)</span>
<span class="c1"># Select only the &quot;name&quot; column</span>
<span class="kp">head</span><span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> <span class="s">&quot;name&quot;</span><span class="p">))</span>
<span class="c1">## name</span>
<span class="c1">## 1 Michael</span>
<span class="c1">## 2 Andy</span>
<span class="c1">## 3 Justin</span>
<span class="c1"># Select everybody, but increment the age by 1</span>
<span class="kp">head</span><span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> df<span class="o">$</span>name<span class="p">,</span> df<span class="o">$</span>age <span class="o">+</span> <span class="m">1</span><span class="p">))</span>
<span class="c1">## name (age + 1.0)</span>
<span class="c1">## 1 Michael NA</span>
<span class="c1">## 2 Andy 31</span>
<span class="c1">## 3 Justin 20</span>
<span class="c1"># Select people older than 21</span>
<span class="kp">head</span><span class="p">(</span>where<span class="p">(</span>df<span class="p">,</span> df<span class="o">$</span>age <span class="o">&gt;</span> <span class="m">21</span><span class="p">))</span>
<span class="c1">## age name</span>
<span class="c1">## 1 30 Andy</span>
<span class="c1"># Count people by age</span>
<span class="kp">head</span><span class="p">(</span>count<span class="p">(</span>groupBy<span class="p">(</span>df<span class="p">,</span> <span class="s">&quot;age&quot;</span><span class="p">)))</span>
<span class="c1">## age count</span>
<span class="c1">## 1 19 1</span>
<span class="c1">## 2 NA 1</span>
<span class="c1">## 3 30 1</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div>
<p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/R/index.html">API Documentation</a>.</p>
<p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/R/SparkDataFrame.html">DataFrame Function Reference</a>.</p>
</div>
</div>
<h2 id="running-sql-queries-programmatically">Running SQL Queries Programmatically</h2>
<div class="codetabs">
<div data-lang="scala">
<p>The <code>sql</code> function on a <code>SparkSession</code> enables applications to run SQL queries programmatically and returns the result as a <code>DataFrame</code>.</p>
<div class="highlight"><pre><span></span><span class="c1">// Register the DataFrame as a SQL temporary view</span>
<span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">sqlDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT * FROM people&quot;</span><span class="o">)</span>
<span class="n">sqlDF</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<p>The <code>sql</code> function on a <code>SparkSession</code> enables applications to run SQL queries programmatically and returns the result as a <code>Dataset&lt;Row&gt;</code>.</p>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="c1">// Register the DataFrame as a SQL temporary view</span>
<span class="n">df</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">);</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">sqlDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT * FROM people&quot;</span><span class="o">);</span>
<span class="n">sqlDF</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<p>The <code>sql</code> function on a <code>SparkSession</code> enables applications to run SQL queries programmatically and returns the result as a <code>DataFrame</code>.</p>
<div class="highlight"><pre><span></span><span class="c1"># Register the DataFrame as a SQL temporary view</span>
<span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">&quot;people&quot;</span><span class="p">)</span>
<span class="n">sqlDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM people&quot;</span><span class="p">)</span>
<span class="n">sqlDF</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># | age| name|</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># |null|Michael|</span>
<span class="c1"># | 30| Andy|</span>
<span class="c1"># | 19| Justin|</span>
<span class="c1"># +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
</div>
<div data-lang="r">
<p>The <code>sql</code> function enables applications to run SQL queries programmatically and returns the result as a <code>SparkDataFrame</code>.</p>
<div class="highlight"><pre><span></span>df <span class="o">&lt;-</span> sql<span class="p">(</span><span class="s">&quot;SELECT * FROM table&quot;</span><span class="p">)</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div>
</div>
</div>
<h2 id="global-temporary-view">Global Temporary View</h2>
<p>Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it
terminates. If you want to have a temporary view that is shared among all sessions and keep alive
until the Spark application terminates, you can create a global temporary view. Global temporary
view is tied to a system preserved database <code>global_temp</code>, and we must use the qualified name to
refer it, e.g. <code>SELECT * FROM global_temp.view1</code>.</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="c1">// Register the DataFrame as a global temporary view</span>
<span class="n">df</span><span class="o">.</span><span class="n">createGlobalTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">)</span>
<span class="c1">// Global temporary view is tied to a system preserved database `global_temp`</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT * FROM global_temp.people&quot;</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// Global temporary view is cross-session</span>
<span class="n">spark</span><span class="o">.</span><span class="n">newSession</span><span class="o">().</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT * FROM global_temp.people&quot;</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="c1">// Register the DataFrame as a global temporary view</span>
<span class="n">df</span><span class="o">.</span><span class="na">createGlobalTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">);</span>
<span class="c1">// Global temporary view is tied to a system preserved database `global_temp`</span>
<span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT * FROM global_temp.people&quot;</span><span class="o">).</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// Global temporary view is cross-session</span>
<span class="n">spark</span><span class="o">.</span><span class="na">newSession</span><span class="o">().</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT * FROM global_temp.people&quot;</span><span class="o">).</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<div class="highlight"><pre><span></span><span class="c1"># Register the DataFrame as a global temporary view</span>
<span class="n">df</span><span class="o">.</span><span class="n">createGlobalTempView</span><span class="p">(</span><span class="s2">&quot;people&quot;</span><span class="p">)</span>
<span class="c1"># Global temporary view is tied to a system preserved database `global_temp`</span>
<span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM global_temp.people&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># | age| name|</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># |null|Michael|</span>
<span class="c1"># | 30| Andy|</span>
<span class="c1"># | 19| Justin|</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># Global temporary view is cross-session</span>
<span class="n">spark</span><span class="o">.</span><span class="n">newSession</span><span class="p">()</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT * FROM global_temp.people&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># | age| name|</span>
<span class="c1"># +----+-------+</span>
<span class="c1"># |null|Michael|</span>
<span class="c1"># | 30| Andy|</span>
<span class="c1"># | 19| Justin|</span>
<span class="c1"># +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
</div>
<div data-lang="sql">
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span></span><span class="k">CREATE</span> <span class="k">GLOBAL</span> <span class="k">TEMPORARY</span> <span class="k">VIEW</span> <span class="n">temp_view</span> <span class="k">AS</span> <span class="k">SELECT</span> <span class="n">a</span> <span class="o">+</span> <span class="mi">1</span><span class="p">,</span> <span class="n">b</span> <span class="o">*</span> <span class="mi">2</span> <span class="k">FROM</span> <span class="n">tbl</span>
<span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">global_temp</span><span class="p">.</span><span class="n">temp_view</span></code></pre></figure>
</div>
</div>
<h2 id="creating-datasets">Creating Datasets</h2>
<p>Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use
a specialized <a href="api/scala/index.html#org.apache.spark.sql.Encoder">Encoder</a> to serialize the objects
for processing or transmitting over the network. While both encoders and standard serialization are
responsible for turning an object into bytes, encoders are code generated dynamically and use a format
that allows Spark to perform many operations like filtering, sorting and hashing without deserializing
the bytes back into an object.</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="k">case</span> <span class="k">class</span> <span class="nc">Person</span><span class="o">(</span><span class="n">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">age</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
<span class="c1">// Encoders are created for case classes</span>
<span class="k">val</span> <span class="n">caseClassDS</span> <span class="k">=</span> <span class="nc">Seq</span><span class="o">(</span><span class="nc">Person</span><span class="o">(</span><span class="s">&quot;Andy&quot;</span><span class="o">,</span> <span class="mi">32</span><span class="o">)).</span><span class="n">toDS</span><span class="o">()</span>
<span class="n">caseClassDS</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+---+</span>
<span class="c1">// |name|age|</span>
<span class="c1">// +----+---+</span>
<span class="c1">// |Andy| 32|</span>
<span class="c1">// +----+---+</span>
<span class="c1">// Encoders for most common types are automatically provided by importing spark.implicits._</span>
<span class="k">val</span> <span class="n">primitiveDS</span> <span class="k">=</span> <span class="nc">Seq</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">).</span><span class="n">toDS</span><span class="o">()</span>
<span class="n">primitiveDS</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span> <span class="o">+</span> <span class="mi">1</span><span class="o">).</span><span class="n">collect</span><span class="o">()</span> <span class="c1">// Returns: Array(2, 3, 4)</span>
<span class="c1">// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name</span>
<span class="k">val</span> <span class="n">path</span> <span class="k">=</span> <span class="s">&quot;examples/src/main/resources/people.json&quot;</span>
<span class="k">val</span> <span class="n">peopleDS</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="n">path</span><span class="o">).</span><span class="n">as</span><span class="o">[</span><span class="kt">Person</span><span class="o">]</span>
<span class="n">peopleDS</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.Arrays</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Collections</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.io.Serializable</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.MapFunction</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Person</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">age</span><span class="o">;</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">getName</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">name</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setName</span><span class="o">(</span><span class="n">String</span> <span class="n">name</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">name</span> <span class="o">=</span> <span class="n">name</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="nf">getAge</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">age</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setAge</span><span class="o">(</span><span class="kt">int</span> <span class="n">age</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">age</span> <span class="o">=</span> <span class="n">age</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Create an instance of a Bean class</span>
<span class="n">Person</span> <span class="n">person</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Person</span><span class="o">();</span>
<span class="n">person</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="s">&quot;Andy&quot;</span><span class="o">);</span>
<span class="n">person</span><span class="o">.</span><span class="na">setAge</span><span class="o">(</span><span class="mi">32</span><span class="o">);</span>
<span class="c1">// Encoders are created for Java beans</span>
<span class="n">Encoder</span><span class="o">&lt;</span><span class="n">Person</span><span class="o">&gt;</span> <span class="n">personEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">bean</span><span class="o">(</span><span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Person</span><span class="o">&gt;</span> <span class="n">javaBeanDS</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataset</span><span class="o">(</span>
<span class="n">Collections</span><span class="o">.</span><span class="na">singletonList</span><span class="o">(</span><span class="n">person</span><span class="o">),</span>
<span class="n">personEncoder</span>
<span class="o">);</span>
<span class="n">javaBeanDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +---+----+</span>
<span class="c1">// |age|name|</span>
<span class="c1">// +---+----+</span>
<span class="c1">// | 32|Andy|</span>
<span class="c1">// +---+----+</span>
<span class="c1">// Encoders for most common types are provided in class Encoders</span>
<span class="n">Encoder</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">integerEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">INT</span><span class="o">();</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">primitiveDS</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataset</span><span class="o">(</span><span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">),</span> <span class="n">integerEncoder</span><span class="o">);</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">transformedDS</span> <span class="o">=</span> <span class="n">primitiveDS</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
<span class="o">(</span><span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;)</span> <span class="n">value</span> <span class="o">-&gt;</span> <span class="n">value</span> <span class="o">+</span> <span class="mi">1</span><span class="o">,</span>
<span class="n">integerEncoder</span><span class="o">);</span>
<span class="n">transformedDS</span><span class="o">.</span><span class="na">collect</span><span class="o">();</span> <span class="c1">// Returns [2, 3, 4]</span>
<span class="c1">// DataFrames can be converted to a Dataset by providing a class. Mapping based on name</span>
<span class="n">String</span> <span class="n">path</span> <span class="o">=</span> <span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="o">;</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Person</span><span class="o">&gt;</span> <span class="n">peopleDS</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="n">path</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="n">personEncoder</span><span class="o">);</span>
<span class="n">peopleDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// | age| name|</span>
<span class="c1">// +----+-------+</span>
<span class="c1">// |null|Michael|</span>
<span class="c1">// | 30| Andy|</span>
<span class="c1">// | 19| Justin|</span>
<span class="c1">// +----+-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
</div>
<h2 id="interoperating-with-rdds">Interoperating with RDDs</h2>
<p>Spark SQL supports two different methods for converting existing RDDs into Datasets. The first
method uses reflection to infer the schema of an RDD that contains specific types of objects. This
reflection-based approach leads to more concise code and works well when you already know the schema
while writing your Spark application.</p>
<p>The second method for creating Datasets is through a programmatic interface that allows you to
construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows
you to construct Datasets when the columns and their types are not known until runtime.</p>
<h3 id="inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</h3>
<div class="codetabs">
<div data-lang="scala">
<p>The Scala interface for Spark SQL supports automatically converting an RDD containing case classes
to a DataFrame. The case class
defines the schema of the table. The names of the arguments to the case class are read using
reflection and become the names of the columns. Case classes can also be nested or contain complex
types such as <code>Seq</code>s or <code>Array</code>s. This RDD can be implicitly converted to a DataFrame and then be
registered as a table. Tables can be used in subsequent SQL statements.</p>
<div class="highlight"><pre><span></span><span class="c1">// For implicit conversions from RDDs to DataFrames</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
<span class="c1">// Create an RDD of Person objects from a text file, convert it to a Dataframe</span>
<span class="k">val</span> <span class="n">peopleDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span>
<span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.txt&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">))</span>
<span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=&gt;</span> <span class="nc">Person</span><span class="o">(</span><span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">.</span><span class="n">toInt</span><span class="o">))</span>
<span class="o">.</span><span class="n">toDF</span><span class="o">()</span>
<span class="c1">// Register the DataFrame as a temporary view</span>
<span class="n">peopleDF</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">)</span>
<span class="c1">// SQL statements can be run by using the sql methods provided by Spark</span>
<span class="k">val</span> <span class="n">teenagersDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT name, age FROM people WHERE age BETWEEN 13 AND 19&quot;</span><span class="o">)</span>
<span class="c1">// The columns of a row in the result can be accessed by field index</span>
<span class="n">teenagersDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">teenager</span> <span class="k">=&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">teenager</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +------------+</span>
<span class="c1">// |Name: Justin|</span>
<span class="c1">// +------------+</span>
<span class="c1">// or by field name</span>
<span class="n">teenagersDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">teenager</span> <span class="k">=&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">teenager</span><span class="o">.</span><span class="n">getAs</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">&quot;name&quot;</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +------------+</span>
<span class="c1">// |Name: Justin|</span>
<span class="c1">// +------------+</span>
<span class="c1">// No pre-defined encoders for Dataset[Map[K,V]], define explicitly</span>
<span class="k">implicit</span> <span class="k">val</span> <span class="n">mapEncoder</span> <span class="k">=</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">Encoders</span><span class="o">.</span><span class="n">kryo</span><span class="o">[</span><span class="kt">Map</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Any</span><span class="o">]]</span>
<span class="c1">// Primitive types and case classes can be also defined as</span>
<span class="c1">// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()</span>
<span class="c1">// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]</span>
<span class="n">teenagersDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">teenager</span> <span class="k">=&gt;</span> <span class="n">teenager</span><span class="o">.</span><span class="n">getValuesMap</span><span class="o">[</span><span class="kt">Any</span><span class="o">](</span><span class="nc">List</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">,</span> <span class="s">&quot;age&quot;</span><span class="o">))).</span><span class="n">collect</span><span class="o">()</span>
<span class="c1">// Array(Map(&quot;name&quot; -&gt; &quot;Justin&quot;, &quot;age&quot; -&gt; 19))</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<p>Spark SQL supports automatically converting an RDD of
<a href="http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly">JavaBeans</a> into a DataFrame.
The <code>BeanInfo</code>, obtained using reflection, defines the schema of the table. Currently, Spark SQL
does not support JavaBeans that contain <code>Map</code> field(s). Nested JavaBeans and <code>List</code> or <code>Array</code>
fields are supported though. You can create a JavaBean by creating a class that implements
Serializable and has getters and setters for all of its fields.</p>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.api.java.JavaRDD</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.Function</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.MapFunction</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span>
<span class="c1">// Create an RDD of Person objects from a text file</span>
<span class="n">JavaRDD</span><span class="o">&lt;</span><span class="n">Person</span><span class="o">&gt;</span> <span class="n">peopleRDD</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">()</span>
<span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.txt&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">javaRDD</span><span class="o">()</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">line</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">String</span><span class="o">[]</span> <span class="n">parts</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">);</span>
<span class="n">Person</span> <span class="n">person</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Person</span><span class="o">();</span>
<span class="n">person</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">0</span><span class="o">]);</span>
<span class="n">person</span><span class="o">.</span><span class="na">setAge</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">()));</span>
<span class="k">return</span> <span class="n">person</span><span class="o">;</span>
<span class="o">});</span>
<span class="c1">// Apply a schema to an RDD of JavaBeans to get a DataFrame</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">peopleDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">peopleRDD</span><span class="o">,</span> <span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="c1">// Register the DataFrame as a temporary view</span>
<span class="n">peopleDF</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">);</span>
<span class="c1">// SQL statements can be run by using the sql methods provided by spark</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">teenagersDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT name FROM people WHERE age BETWEEN 13 AND 19&quot;</span><span class="o">);</span>
<span class="c1">// The columns of a row in the result can be accessed by field index</span>
<span class="n">Encoder</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">stringEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">STRING</span><span class="o">();</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">teenagerNamesByIndexDF</span> <span class="o">=</span> <span class="n">teenagersDF</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
<span class="o">(</span><span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;)</span> <span class="n">row</span> <span class="o">-&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span>
<span class="n">stringEncoder</span><span class="o">);</span>
<span class="n">teenagerNamesByIndexDF</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +------------+</span>
<span class="c1">// |Name: Justin|</span>
<span class="c1">// +------------+</span>
<span class="c1">// or by field name</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">teenagerNamesByFieldDF</span> <span class="o">=</span> <span class="n">teenagersDF</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
<span class="o">(</span><span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;)</span> <span class="n">row</span> <span class="o">-&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">row</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">getAs</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">),</span>
<span class="n">stringEncoder</span><span class="o">);</span>
<span class="n">teenagerNamesByFieldDF</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +------------+</span>
<span class="c1">// |Name: Justin|</span>
<span class="c1">// +------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<p>Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of
key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table,
and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.</p>
<div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Row</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span>
<span class="c1"># Load a text file and convert each line to a Row.</span>
<span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s2">&quot;examples/src/main/resources/people.txt&quot;</span><span class="p">)</span>
<span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">&quot;,&quot;</span><span class="p">))</span>
<span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">age</span><span class="o">=</span><span class="nb">int</span><span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">])))</span>
<span class="c1"># Infer the schema, and register the DataFrame as a table.</span>
<span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">)</span>
<span class="n">schemaPeople</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">&quot;people&quot;</span><span class="p">)</span>
<span class="c1"># SQL can be run over DataFrames that have been registered as a table.</span>
<span class="n">teenagers</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT name FROM people WHERE age &gt;= 13 AND age &lt;= 19&quot;</span><span class="p">)</span>
<span class="c1"># The results of SQL queries are Dataframe objects.</span>
<span class="c1"># rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.</span>
<span class="n">teenNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s2">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span>
<span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">teenNames</span><span class="p">:</span>
<span class="k">print</span><span class="p">(</span><span class="n">name</span><span class="p">)</span>
<span class="c1"># Name: Justin</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
</div>
</div>
<h3 id="programmatically-specifying-the-schema">Programmatically Specifying the Schema</h3>
<div class="codetabs">
<div data-lang="scala">
<p>When case classes cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be parsed
and fields will be projected differently for different users),
a <code>DataFrame</code> can be created programmatically with three steps.</p>
<ol>
<li>Create an RDD of <code>Row</code>s from the original RDD;</li>
<li>Create the schema represented by a <code>StructType</code> matching the structure of
<code>Row</code>s in the RDD created in Step 1.</li>
<li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided
by <code>SparkSession</code>.</li>
</ol>
<p>For example:</p>
<div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span>
<span class="c1">// Create an RDD</span>
<span class="k">val</span> <span class="n">peopleRDD</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.txt&quot;</span><span class="o">)</span>
<span class="c1">// The schema is encoded in a string</span>
<span class="k">val</span> <span class="n">schemaString</span> <span class="k">=</span> <span class="s">&quot;name age&quot;</span>
<span class="c1">// Generate the schema based on the string of schema</span>
<span class="k">val</span> <span class="n">fields</span> <span class="k">=</span> <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span>
<span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">fieldName</span> <span class="k">=&gt;</span> <span class="nc">StructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">,</span> <span class="n">nullable</span> <span class="k">=</span> <span class="kc">true</span><span class="o">))</span>
<span class="k">val</span> <span class="n">schema</span> <span class="k">=</span> <span class="nc">StructType</span><span class="o">(</span><span class="n">fields</span><span class="o">)</span>
<span class="c1">// Convert records of the RDD (people) to Rows</span>
<span class="k">val</span> <span class="n">rowRDD</span> <span class="k">=</span> <span class="n">peopleRDD</span>
<span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">))</span>
<span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=&gt;</span> <span class="nc">Row</span><span class="o">(</span><span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">))</span>
<span class="c1">// Apply the schema to the RDD</span>
<span class="k">val</span> <span class="n">peopleDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">)</span>
<span class="c1">// Creates a temporary view using the DataFrame</span>
<span class="n">peopleDF</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">)</span>
<span class="c1">// SQL can be run over a temporary view created using DataFrames</span>
<span class="k">val</span> <span class="n">results</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT name FROM people&quot;</span><span class="o">)</span>
<span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations</span>
<span class="c1">// The columns of a row in the result can be accessed by field index or by field name</span>
<span class="n">results</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +-------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +-------------+</span>
<span class="c1">// |Name: Michael|</span>
<span class="c1">// | Name: Andy|</span>
<span class="c1">// | Name: Justin|</span>
<span class="c1">// +-------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<p>When JavaBean classes cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be parsed and
fields will be projected differently for different users),
a <code>Dataset&lt;Row&gt;</code> can be created programmatically with three steps.</p>
<ol>
<li>Create an RDD of <code>Row</code>s from the original RDD;</li>
<li>Create the schema represented by a <code>StructType</code> matching the structure of
<code>Row</code>s in the RDD created in Step 1.</li>
<li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided
by <code>SparkSession</code>.</li>
</ol>
<p>For example:</p>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.ArrayList</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.JavaRDD</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.Function</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataTypes</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructField</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructType</span><span class="o">;</span>
<span class="c1">// Create an RDD</span>
<span class="n">JavaRDD</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">peopleRDD</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sparkContext</span><span class="o">()</span>
<span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.txt&quot;</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">toJavaRDD</span><span class="o">();</span>
<span class="c1">// The schema is encoded in a string</span>
<span class="n">String</span> <span class="n">schemaString</span> <span class="o">=</span> <span class="s">&quot;name age&quot;</span><span class="o">;</span>
<span class="c1">// Generate the schema based on the string of schema</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">StructField</span><span class="o">&gt;</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">fieldName</span> <span class="o">:</span> <span class="n">schemaString</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
<span class="n">StructField</span> <span class="n">field</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">StringType</span><span class="o">,</span> <span class="kc">true</span><span class="o">);</span>
<span class="n">fields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">field</span><span class="o">);</span>
<span class="o">}</span>
<span class="n">StructType</span> <span class="n">schema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">fields</span><span class="o">);</span>
<span class="c1">// Convert records of the RDD (people) to Rows</span>
<span class="n">JavaRDD</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">rowRDD</span> <span class="o">=</span> <span class="n">peopleRDD</span><span class="o">.</span><span class="na">map</span><span class="o">((</span><span class="n">Function</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Row</span><span class="o">&gt;)</span> <span class="n">record</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">String</span><span class="o">[]</span> <span class="n">attributes</span> <span class="o">=</span> <span class="n">record</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">);</span>
<span class="k">return</span> <span class="n">RowFactory</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">attributes</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">attributes</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">());</span>
<span class="o">});</span>
<span class="c1">// Apply the schema to the RDD</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">peopleDataFrame</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">);</span>
<span class="c1">// Creates a temporary view using the DataFrame</span>
<span class="n">peopleDataFrame</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;people&quot;</span><span class="o">);</span>
<span class="c1">// SQL can be run over a temporary view created using DataFrames</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT name FROM people&quot;</span><span class="o">);</span>
<span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations</span>
<span class="c1">// The columns of a row in the result can be accessed by field index or by field name</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">namesDS</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
<span class="o">(</span><span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;)</span> <span class="n">row</span> <span class="o">-&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span>
<span class="n">Encoders</span><span class="o">.</span><span class="na">STRING</span><span class="o">());</span>
<span class="n">namesDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +-------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +-------------+</span>
<span class="c1">// |Name: Michael|</span>
<span class="c1">// | Name: Andy|</span>
<span class="c1">// | Name: Justin|</span>
<span class="c1">// +-------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<p>When a dictionary of kwargs cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be parsed and
fields will be projected differently for different users),
a <code>DataFrame</code> can be created programmatically with three steps.</p>
<ol>
<li>Create an RDD of tuples or lists from the original RDD;</li>
<li>Create the schema represented by a <code>StructType</code> matching the structure of
tuples or lists in the RDD created in the step 1.</li>
<li>Apply the schema to the RDD via <code>createDataFrame</code> method provided by <code>SparkSession</code>.</li>
</ol>
<p>For example:</p>
<div class="highlight"><pre><span></span><span class="c1"># Import data types</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="o">*</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span>
<span class="c1"># Load a text file and convert each line to a Row.</span>
<span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s2">&quot;examples/src/main/resources/people.txt&quot;</span><span class="p">)</span>
<span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">&quot;,&quot;</span><span class="p">))</span>
<span class="c1"># Each line is converted to a tuple.</span>
<span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">strip</span><span class="p">()))</span>
<span class="c1"># The schema is encoded in a string.</span>
<span class="n">schemaString</span> <span class="o">=</span> <span class="s2">&quot;name age&quot;</span>
<span class="n">fields</span> <span class="o">=</span> <span class="p">[</span><span class="n">StructField</span><span class="p">(</span><span class="n">field_name</span><span class="p">,</span> <span class="n">StringType</span><span class="p">(),</span> <span class="bp">True</span><span class="p">)</span> <span class="k">for</span> <span class="n">field_name</span> <span class="ow">in</span> <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="p">()]</span>
<span class="n">schema</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span><span class="n">fields</span><span class="p">)</span>
<span class="c1"># Apply the schema to the RDD.</span>
<span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">,</span> <span class="n">schema</span><span class="p">)</span>
<span class="c1"># Creates a temporary view using the DataFrame</span>
<span class="n">schemaPeople</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">&quot;people&quot;</span><span class="p">)</span>
<span class="c1"># SQL can be run over DataFrames that have been registered as a table.</span>
<span class="n">results</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT name FROM people&quot;</span><span class="p">)</span>
<span class="n">results</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +-------+</span>
<span class="c1"># | name|</span>
<span class="c1"># +-------+</span>
<span class="c1"># |Michael|</span>
<span class="c1"># | Andy|</span>
<span class="c1"># | Justin|</span>
<span class="c1"># +-------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div>
</div>
</div>
<h2 id="aggregations">Aggregations</h2>
<p>The <a href="api/scala/index.html#org.apache.spark.sql.functions$">built-in DataFrames functions</a> provide common
aggregations such as <code>count()</code>, <code>countDistinct()</code>, <code>avg()</code>, <code>max()</code>, <code>min()</code>, etc.
While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in
<a href="api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$">Scala</a> and
<a href="api/java/org/apache/spark/sql/expressions/javalang/typed.html">Java</a> to work with strongly typed Datasets.
Moreover, users are not limited to the predefined aggregate functions and can create their own.</p>
<h3 id="untyped-user-defined-aggregate-functions">Untyped User-Defined Aggregate Functions</h3>
<p>Users have to extend the <a href="api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction">UserDefinedAggregateFunction</a>
abstract class to implement a custom untyped aggregate function. For example, a user-defined average
can look like:</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.</span><span class="o">{</span><span class="nc">Row</span><span class="o">,</span> <span class="nc">SparkSession</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.expressions.MutableAggregationBuffer</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.expressions.UserDefinedAggregateFunction</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span>
<span class="k">object</span> <span class="nc">MyAverage</span> <span class="k">extends</span> <span class="nc">UserDefinedAggregateFunction</span> <span class="o">{</span>
<span class="c1">// Data types of input arguments of this aggregate function</span>
<span class="k">def</span> <span class="n">inputSchema</span><span class="k">:</span> <span class="kt">StructType</span> <span class="o">=</span> <span class="nc">StructType</span><span class="o">(</span><span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;inputColumn&quot;</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span> <span class="o">::</span> <span class="nc">Nil</span><span class="o">)</span>
<span class="c1">// Data types of values in the aggregation buffer</span>
<span class="k">def</span> <span class="n">bufferSchema</span><span class="k">:</span> <span class="kt">StructType</span> <span class="o">=</span> <span class="o">{</span>
<span class="nc">StructType</span><span class="o">(</span><span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;sum&quot;</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span> <span class="o">::</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span> <span class="o">::</span> <span class="nc">Nil</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// The data type of the returned value</span>
<span class="k">def</span> <span class="n">dataType</span><span class="k">:</span> <span class="kt">DataType</span> <span class="o">=</span> <span class="nc">DoubleType</span>
<span class="c1">// Whether this function always returns the same output on the identical input</span>
<span class="k">def</span> <span class="n">deterministic</span><span class="k">:</span> <span class="kt">Boolean</span> <span class="o">=</span> <span class="kc">true</span>
<span class="c1">// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to</span>
<span class="c1">// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides</span>
<span class="c1">// the opportunity to update its values. Note that arrays and maps inside the buffer are still</span>
<span class="c1">// immutable.</span>
<span class="k">def</span> <span class="n">initialize</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">MutableAggregationBuffer</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">buffer</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="k">=</span> <span class="mi">0L</span>
<span class="n">buffer</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="k">=</span> <span class="mi">0L</span>
<span class="o">}</span>
<span class="c1">// Updates the given aggregation buffer `buffer` with new input data from `input`</span>
<span class="k">def</span> <span class="n">update</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">MutableAggregationBuffer</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">Row</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(!</span><span class="n">input</span><span class="o">.</span><span class="n">isNullAt</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">{</span>
<span class="n">buffer</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">input</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="n">buffer</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`</span>
<span class="k">def</span> <span class="n">merge</span><span class="o">(</span><span class="n">buffer1</span><span class="k">:</span> <span class="kt">MutableAggregationBuffer</span><span class="o">,</span> <span class="n">buffer2</span><span class="k">:</span> <span class="kt">Row</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">buffer1</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
<span class="n">buffer1</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// Calculates the final result</span>
<span class="k">def</span> <span class="n">evaluate</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">Row</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">toDouble</span> <span class="o">/</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// Register the function to access it</span>
<span class="n">spark</span><span class="o">.</span><span class="n">udf</span><span class="o">.</span><span class="n">register</span><span class="o">(</span><span class="s">&quot;myAverage&quot;</span><span class="o">,</span> <span class="nc">MyAverage</span><span class="o">)</span>
<span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/employees.json&quot;</span><span class="o">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;employees&quot;</span><span class="o">)</span>
<span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// | name|salary|</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// |Michael| 3000|</span>
<span class="c1">// | Andy| 4500|</span>
<span class="c1">// | Justin| 3500|</span>
<span class="c1">// | Berta| 4000|</span>
<span class="c1">// +-------+------+</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT myAverage(salary) as average_salary FROM employees&quot;</span><span class="o">)</span>
<span class="n">result</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +--------------+</span>
<span class="c1">// |average_salary|</span>
<span class="c1">// +--------------+</span>
<span class="c1">// | 3750.0|</span>
<span class="c1">// +--------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.ArrayList</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.expressions.MutableAggregationBuffer</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.expressions.UserDefinedAggregateFunction</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataType</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataTypes</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructField</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructType</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyAverage</span> <span class="kd">extends</span> <span class="n">UserDefinedAggregateFunction</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">StructType</span> <span class="n">inputSchema</span><span class="o">;</span>
<span class="kd">private</span> <span class="n">StructType</span> <span class="n">bufferSchema</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">MyAverage</span><span class="o">()</span> <span class="o">{</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">StructField</span><span class="o">&gt;</span> <span class="n">inputFields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="n">inputFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">&quot;inputColumn&quot;</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span>
<span class="n">inputSchema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">inputFields</span><span class="o">);</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">StructField</span><span class="o">&gt;</span> <span class="n">bufferFields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="n">bufferFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">&quot;sum&quot;</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span>
<span class="n">bufferFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span>
<span class="n">bufferSchema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">bufferFields</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Data types of input arguments of this aggregate function</span>
<span class="kd">public</span> <span class="n">StructType</span> <span class="nf">inputSchema</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">inputSchema</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// Data types of values in the aggregation buffer</span>
<span class="kd">public</span> <span class="n">StructType</span> <span class="nf">bufferSchema</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">bufferSchema</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// The data type of the returned value</span>
<span class="kd">public</span> <span class="n">DataType</span> <span class="nf">dataType</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">DoubleType</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// Whether this function always returns the same output on the identical input</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">deterministic</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="kc">true</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to</span>
<span class="c1">// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides</span>
<span class="c1">// the opportunity to update its values. Note that arrays and maps inside the buffer are still</span>
<span class="c1">// immutable.</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">initialize</span><span class="o">(</span><span class="n">MutableAggregationBuffer</span> <span class="n">buffer</span><span class="o">)</span> <span class="o">{</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">0</span><span class="n">L</span><span class="o">);</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">0</span><span class="n">L</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Updates the given aggregation buffer `buffer` with new input data from `input`</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">update</span><span class="o">(</span><span class="n">MutableAggregationBuffer</span> <span class="n">buffer</span><span class="o">,</span> <span class="n">Row</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
<span class="k">if</span> <span class="o">(!</span><span class="n">input</span><span class="o">.</span><span class="na">isNullAt</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">updatedSum</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
<span class="kt">long</span> <span class="n">updatedCount</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">updatedSum</span><span class="o">);</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">updatedCount</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">merge</span><span class="o">(</span><span class="n">MutableAggregationBuffer</span> <span class="n">buffer1</span><span class="o">,</span> <span class="n">Row</span> <span class="n">buffer2</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">mergedSum</span> <span class="o">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
<span class="kt">long</span> <span class="n">mergedCount</span> <span class="o">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">buffer1</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">mergedSum</span><span class="o">);</span>
<span class="n">buffer1</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">mergedCount</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Calculates the final result</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">evaluate</span><span class="o">(</span><span class="n">Row</span> <span class="n">buffer</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">/</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Register the function to access it</span>
<span class="n">spark</span><span class="o">.</span><span class="na">udf</span><span class="o">().</span><span class="na">register</span><span class="o">(</span><span class="s">&quot;myAverage&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">MyAverage</span><span class="o">());</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/employees.json&quot;</span><span class="o">);</span>
<span class="n">df</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;employees&quot;</span><span class="o">);</span>
<span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// | name|salary|</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// |Michael| 3000|</span>
<span class="c1">// | Andy| 4500|</span>
<span class="c1">// | Justin| 3500|</span>
<span class="c1">// | Berta| 4000|</span>
<span class="c1">// +-------+------+</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT myAverage(salary) as average_salary FROM employees&quot;</span><span class="o">);</span>
<span class="n">result</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +--------------+</span>
<span class="c1">// |average_salary|</span>
<span class="c1">// +--------------+</span>
<span class="c1">// | 3750.0|</span>
<span class="c1">// +--------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java" in the Spark repo.</small></div>
</div>
</div>
<h3 id="type-safe-user-defined-aggregate-functions">Type-Safe User-Defined Aggregate Functions</h3>
<p>User-defined aggregations for strongly typed Datasets revolve around the <a href="api/scala/index.html#org.apache.spark.sql.expressions.Aggregator">Aggregator</a> abstract class.
For example, a type-safe user-defined average can look like:</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.</span><span class="o">{</span><span class="nc">Encoder</span><span class="o">,</span> <span class="nc">Encoders</span><span class="o">,</span> <span class="nc">SparkSession</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.expressions.Aggregator</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">Employee</span><span class="o">(</span><span class="n">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">salary</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">Average</span><span class="o">(</span><span class="k">var</span> <span class="n">sum</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="k">var</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
<span class="k">object</span> <span class="nc">MyAverage</span> <span class="k">extends</span> <span class="nc">Aggregator</span><span class="o">[</span><span class="kt">Employee</span>, <span class="kt">Average</span>, <span class="kt">Double</span><span class="o">]</span> <span class="o">{</span>
<span class="c1">// A zero value for this aggregation. Should satisfy the property that any b + zero = b</span>
<span class="k">def</span> <span class="n">zero</span><span class="k">:</span> <span class="kt">Average</span> <span class="o">=</span> <span class="nc">Average</span><span class="o">(</span><span class="mi">0L</span><span class="o">,</span> <span class="mi">0L</span><span class="o">)</span>
<span class="c1">// Combine two values to produce a new value. For performance, the function may modify `buffer`</span>
<span class="c1">// and return it instead of constructing a new object</span>
<span class="k">def</span> <span class="n">reduce</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">Average</span><span class="o">,</span> <span class="n">employee</span><span class="k">:</span> <span class="kt">Employee</span><span class="o">)</span><span class="k">:</span> <span class="kt">Average</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">buffer</span><span class="o">.</span><span class="n">sum</span> <span class="o">+=</span> <span class="n">employee</span><span class="o">.</span><span class="n">salary</span>
<span class="n">buffer</span><span class="o">.</span><span class="n">count</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="n">buffer</span>
<span class="o">}</span>
<span class="c1">// Merge two intermediate values</span>
<span class="k">def</span> <span class="n">merge</span><span class="o">(</span><span class="n">b1</span><span class="k">:</span> <span class="kt">Average</span><span class="o">,</span> <span class="n">b2</span><span class="k">:</span> <span class="kt">Average</span><span class="o">)</span><span class="k">:</span> <span class="kt">Average</span> <span class="o">=</span> <span class="o">{</span>
<span class="n">b1</span><span class="o">.</span><span class="n">sum</span> <span class="o">+=</span> <span class="n">b2</span><span class="o">.</span><span class="n">sum</span>
<span class="n">b1</span><span class="o">.</span><span class="n">count</span> <span class="o">+=</span> <span class="n">b2</span><span class="o">.</span><span class="n">count</span>
<span class="n">b1</span>
<span class="o">}</span>
<span class="c1">// Transform the output of the reduction</span>
<span class="k">def</span> <span class="n">finish</span><span class="o">(</span><span class="n">reduction</span><span class="k">:</span> <span class="kt">Average</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="n">reduction</span><span class="o">.</span><span class="n">sum</span><span class="o">.</span><span class="n">toDouble</span> <span class="o">/</span> <span class="n">reduction</span><span class="o">.</span><span class="n">count</span>
<span class="c1">// Specifies the Encoder for the intermediate value type</span>
<span class="k">def</span> <span class="n">bufferEncoder</span><span class="k">:</span> <span class="kt">Encoder</span><span class="o">[</span><span class="kt">Average</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Encoders</span><span class="o">.</span><span class="n">product</span>
<span class="c1">// Specifies the Encoder for the final output value type</span>
<span class="k">def</span> <span class="n">outputEncoder</span><span class="k">:</span> <span class="kt">Encoder</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Encoders</span><span class="o">.</span><span class="n">scalaDouble</span>
<span class="o">}</span>
<span class="k">val</span> <span class="n">ds</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/employees.json&quot;</span><span class="o">).</span><span class="n">as</span><span class="o">[</span><span class="kt">Employee</span><span class="o">]</span>
<span class="n">ds</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// | name|salary|</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// |Michael| 3000|</span>
<span class="c1">// | Andy| 4500|</span>
<span class="c1">// | Justin| 3500|</span>
<span class="c1">// | Berta| 4000|</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// Convert the function to a `TypedColumn` and give it a name</span>
<span class="k">val</span> <span class="n">averageSalary</span> <span class="k">=</span> <span class="nc">MyAverage</span><span class="o">.</span><span class="n">toColumn</span><span class="o">.</span><span class="n">name</span><span class="o">(</span><span class="s">&quot;average_salary&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">ds</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="n">averageSalary</span><span class="o">)</span>
<span class="n">result</span><span class="o">.</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +--------------+</span>
<span class="c1">// |average_salary|</span>
<span class="c1">// +--------------+</span>
<span class="c1">// | 3750.0|</span>
<span class="c1">// +--------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.io.Serializable</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.TypedColumn</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.expressions.Aggregator</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Employee</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">long</span> <span class="n">salary</span><span class="o">;</span>
<span class="c1">// Constructors, getters, setters...</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Average</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">long</span> <span class="n">sum</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">long</span> <span class="n">count</span><span class="o">;</span>
<span class="c1">// Constructors, getters, setters...</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyAverage</span> <span class="kd">extends</span> <span class="n">Aggregator</span><span class="o">&lt;</span><span class="n">Employee</span><span class="o">,</span> <span class="n">Average</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">// A zero value for this aggregation. Should satisfy the property that any b + zero = b</span>
<span class="kd">public</span> <span class="n">Average</span> <span class="nf">zero</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">Average</span><span class="o">(</span><span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="mi">0</span><span class="n">L</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Combine two values to produce a new value. For performance, the function may modify `buffer`</span>
<span class="c1">// and return it instead of constructing a new object</span>
<span class="kd">public</span> <span class="n">Average</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Average</span> <span class="n">buffer</span><span class="o">,</span> <span class="n">Employee</span> <span class="n">employee</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">newSum</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getSum</span><span class="o">()</span> <span class="o">+</span> <span class="n">employee</span><span class="o">.</span><span class="na">getSalary</span><span class="o">();</span>
<span class="kt">long</span> <span class="n">newCount</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getCount</span><span class="o">()</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">setSum</span><span class="o">(</span><span class="n">newSum</span><span class="o">);</span>
<span class="n">buffer</span><span class="o">.</span><span class="na">setCount</span><span class="o">(</span><span class="n">newCount</span><span class="o">);</span>
<span class="k">return</span> <span class="n">buffer</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// Merge two intermediate values</span>
<span class="kd">public</span> <span class="n">Average</span> <span class="nf">merge</span><span class="o">(</span><span class="n">Average</span> <span class="n">b1</span><span class="o">,</span> <span class="n">Average</span> <span class="n">b2</span><span class="o">)</span> <span class="o">{</span>
<span class="kt">long</span> <span class="n">mergedSum</span> <span class="o">=</span> <span class="n">b1</span><span class="o">.</span><span class="na">getSum</span><span class="o">()</span> <span class="o">+</span> <span class="n">b2</span><span class="o">.</span><span class="na">getSum</span><span class="o">();</span>
<span class="kt">long</span> <span class="n">mergedCount</span> <span class="o">=</span> <span class="n">b1</span><span class="o">.</span><span class="na">getCount</span><span class="o">()</span> <span class="o">+</span> <span class="n">b2</span><span class="o">.</span><span class="na">getCount</span><span class="o">();</span>
<span class="n">b1</span><span class="o">.</span><span class="na">setSum</span><span class="o">(</span><span class="n">mergedSum</span><span class="o">);</span>
<span class="n">b1</span><span class="o">.</span><span class="na">setCount</span><span class="o">(</span><span class="n">mergedCount</span><span class="o">);</span>
<span class="k">return</span> <span class="n">b1</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// Transform the output of the reduction</span>
<span class="kd">public</span> <span class="n">Double</span> <span class="nf">finish</span><span class="o">(</span><span class="n">Average</span> <span class="n">reduction</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">reduction</span><span class="o">.</span><span class="na">getSum</span><span class="o">())</span> <span class="o">/</span> <span class="n">reduction</span><span class="o">.</span><span class="na">getCount</span><span class="o">();</span>
<span class="o">}</span>
<span class="c1">// Specifies the Encoder for the intermediate value type</span>
<span class="kd">public</span> <span class="n">Encoder</span><span class="o">&lt;</span><span class="n">Average</span><span class="o">&gt;</span> <span class="nf">bufferEncoder</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">bean</span><span class="o">(</span><span class="n">Average</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Specifies the Encoder for the final output value type</span>
<span class="kd">public</span> <span class="n">Encoder</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="nf">outputEncoder</span><span class="o">()</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">DOUBLE</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">Encoder</span><span class="o">&lt;</span><span class="n">Employee</span><span class="o">&gt;</span> <span class="n">employeeEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">bean</span><span class="o">(</span><span class="n">Employee</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">String</span> <span class="n">path</span> <span class="o">=</span> <span class="s">&quot;examples/src/main/resources/employees.json&quot;</span><span class="o">;</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Employee</span><span class="o">&gt;</span> <span class="n">ds</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="n">path</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="n">employeeEncoder</span><span class="o">);</span>
<span class="n">ds</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// | name|salary|</span>
<span class="c1">// +-------+------+</span>
<span class="c1">// |Michael| 3000|</span>
<span class="c1">// | Andy| 4500|</span>
<span class="c1">// | Justin| 3500|</span>
<span class="c1">// | Berta| 4000|</span>
<span class="c1">// +-------+------+</span>
<span class="n">MyAverage</span> <span class="n">myAverage</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyAverage</span><span class="o">();</span>
<span class="c1">// Convert the function to a `TypedColumn` and give it a name</span>
<span class="n">TypedColumn</span><span class="o">&lt;</span><span class="n">Employee</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">averageSalary</span> <span class="o">=</span> <span class="n">myAverage</span><span class="o">.</span><span class="na">toColumn</span><span class="o">().</span><span class="na">name</span><span class="o">(</span><span class="s">&quot;average_salary&quot;</span><span class="o">);</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">averageSalary</span><span class="o">);</span>
<span class="n">result</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +--------------+</span>
<span class="c1">// |average_salary|</span>
<span class="c1">// +--------------+</span>
<span class="c1">// | 3750.0|</span>
<span class="c1">// +--------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java" in the Spark repo.</small></div>
</div>
</div>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-1.12.4.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>