| |
| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <title>Getting Started - Spark 2.4.7 Documentation</title> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <meta name="viewport" content="width=device-width"> |
| <link rel="stylesheet" href="css/bootstrap-responsive.min.css"> |
| <link rel="stylesheet" href="css/main.css"> |
| |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| |
| |
| <!-- Google analytics script --> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-32518208-2']); |
| _gaq.push(['_trackPageview']); |
| |
| (function() { |
| var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; |
| ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); |
| })(); |
| </script> |
| |
| |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <div class="navbar navbar-fixed-top" id="topbar"> |
| <div class="navbar-inner"> |
| <div class="container"> |
| <div class="brand"><a href="index.html"> |
| <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">2.4.7</span> |
| </div> |
| <ul class="nav"> |
| <!--TODO(andyk): Add class="active" attribute to li some how.--> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="quick-start.html">Quick Start</a></li> |
| <li><a href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a></li> |
| <li><a href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a></li> |
| <li><a href="structured-streaming-programming-guide.html">Structured Streaming</a></li> |
| <li><a href="streaming-programming-guide.html">Spark Streaming (DStreams)</a></li> |
| <li><a href="ml-guide.html">MLlib (Machine Learning)</a></li> |
| <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> |
| <li><a href="sparkr.html">SparkR (R on Spark)</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li> |
| <li><a href="api/java/index.html">Java</a></li> |
| <li><a href="api/python/index.html">Python</a></li> |
| <li><a href="api/R/index.html">R</a></li> |
| <li><a href="api/sql/index.html">SQL, Built-in Functions</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="cluster-overview.html">Overview</a></li> |
| <li><a href="submitting-applications.html">Submitting Applications</a></li> |
| <li class="divider"></li> |
| <li><a href="spark-standalone.html">Spark Standalone</a></li> |
| <li><a href="running-on-mesos.html">Mesos</a></li> |
| <li><a href="running-on-yarn.html">YARN</a></li> |
| <li><a href="running-on-kubernetes.html">Kubernetes</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="configuration.html">Configuration</a></li> |
| <li><a href="monitoring.html">Monitoring</a></li> |
| <li><a href="tuning.html">Tuning Guide</a></li> |
| <li><a href="job-scheduling.html">Job Scheduling</a></li> |
| <li><a href="security.html">Security</a></li> |
| <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> |
| <li class="divider"></li> |
| <li><a href="building-spark.html">Building Spark</a></li> |
| <li><a href="https://spark.apache.org/contributing.html">Contributing to Spark</a></li> |
| <li><a href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!--<p class="navbar-text pull-right"><span class="version-text">v2.4.7</span></p>--> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container-wrapper"> |
| |
| |
| |
| <div class="left-menu-wrapper"> |
| <div class="left-menu"> |
| <h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3> |
| |
| <ul> |
| |
| <li> |
| <a href="sql-getting-started.html"> |
| |
| <b>Getting Started</b> |
| |
| </a> |
| </li> |
| |
| |
| |
| <ul> |
| |
| <li> |
| <a href="sql-getting-started.html#starting-point-sparksession"> |
| |
| Starting Point: SparkSession |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#creating-dataframes"> |
| |
| Creating DataFrames |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#untyped-dataset-operations-aka-dataframe-operations"> |
| |
| Untyped Dataset Operations (DataFrame operations) |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#running-sql-queries-programmatically"> |
| |
| Running SQL Queries Programmatically |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#global-temporary-view"> |
| |
| Global Temporary View |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#creating-datasets"> |
| |
| Creating Datasets |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#interoperating-with-rdds"> |
| |
| Interoperating with RDDs |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-getting-started.html#aggregations"> |
| |
| Aggregations |
| |
| </a> |
| </li> |
| |
| |
| |
| </ul> |
| |
| |
| |
| <li> |
| <a href="sql-data-sources.html"> |
| |
| Data Sources |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-performance-tuning.html"> |
| |
| Performance Tuning |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-distributed-sql-engine.html"> |
| |
| Distributed SQL Engine |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-pyspark-pandas-with-arrow.html"> |
| |
| PySpark Usage Guide for Pandas with Apache Arrow |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-migration-guide.html"> |
| |
| Migration Guide |
| |
| </a> |
| </li> |
| |
| |
| |
| <li> |
| <a href="sql-reference.html"> |
| |
| Reference |
| |
| </a> |
| </li> |
| |
| |
| |
| </ul> |
| |
| </div> |
| </div> |
| |
| <input id="nav-trigger" class="nav-trigger" checked type="checkbox"> |
| <label for="nav-trigger"></label> |
| <div class="content-with-sidebar" id="content"> |
| |
| <h1 class="title">Getting Started</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#starting-point-sparksession" id="markdown-toc-starting-point-sparksession">Starting Point: SparkSession</a></li> |
| <li><a href="#creating-dataframes" id="markdown-toc-creating-dataframes">Creating DataFrames</a></li> |
| <li><a href="#untyped-dataset-operations-aka-dataframe-operations" id="markdown-toc-untyped-dataset-operations-aka-dataframe-operations">Untyped Dataset Operations (aka DataFrame Operations)</a></li> |
| <li><a href="#running-sql-queries-programmatically" id="markdown-toc-running-sql-queries-programmatically">Running SQL Queries Programmatically</a></li> |
| <li><a href="#global-temporary-view" id="markdown-toc-global-temporary-view">Global Temporary View</a></li> |
| <li><a href="#creating-datasets" id="markdown-toc-creating-datasets">Creating Datasets</a></li> |
| <li><a href="#interoperating-with-rdds" id="markdown-toc-interoperating-with-rdds">Interoperating with RDDs</a> <ul> |
| <li><a href="#inferring-the-schema-using-reflection" id="markdown-toc-inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</a></li> |
| <li><a href="#programmatically-specifying-the-schema" id="markdown-toc-programmatically-specifying-the-schema">Programmatically Specifying the Schema</a></li> |
| </ul> |
| </li> |
| <li><a href="#aggregations" id="markdown-toc-aggregations">Aggregations</a> <ul> |
| <li><a href="#untyped-user-defined-aggregate-functions" id="markdown-toc-untyped-user-defined-aggregate-functions">Untyped User-Defined Aggregate Functions</a></li> |
| <li><a href="#type-safe-user-defined-aggregate-functions" id="markdown-toc-type-safe-user-defined-aggregate-functions">Type-Safe User-Defined Aggregate Functions</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h2 id="starting-point-sparksession">Starting Point: SparkSession</h2> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>The entry point into all functionality in Spark is the <a href="api/scala/index.html#org.apache.spark.sql.SparkSession"><code>SparkSession</code></a> class. To create a basic <code>SparkSession</code>, just use <code>SparkSession.builder()</code>:</p> |
| |
| <div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span> |
| |
| <span class="k">val</span> <span class="n">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span> |
| <span class="o">.</span><span class="n">builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="n">appName</span><span class="o">(</span><span class="s">"Spark SQL basic example"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">config</span><span class="o">(</span><span class="s">"spark.some.config.option"</span><span class="o">,</span> <span class="s">"some-value"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">getOrCreate</span><span class="o">()</span> |
| |
| <span class="c1">// For implicit conversions like converting RDDs to DataFrames</span> |
| <span class="k">import</span> <span class="nn">spark.implicits._</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>The entry point into all functionality in Spark is the <a href="api/java/index.html#org.apache.spark.sql.SparkSession"><code>SparkSession</code></a> class. To create a basic <code>SparkSession</code>, just use <code>SparkSession.builder()</code>:</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span><span class="o">;</span> |
| |
| <span class="n">SparkSession</span> <span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span> |
| <span class="o">.</span><span class="na">builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">appName</span><span class="o">(</span><span class="s">"Java Spark SQL basic example"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">config</span><span class="o">(</span><span class="s">"spark.some.config.option"</span><span class="o">,</span> <span class="s">"some-value"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">getOrCreate</span><span class="o">();</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>The entry point into all functionality in Spark is the <a href="api/python/pyspark.sql.html#pyspark.sql.SparkSession"><code>SparkSession</code></a> class. To create a basic <code>SparkSession</code>, just use <code>SparkSession.builder</code>:</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span> |
| |
| <span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span> \ |
| <span class="o">.</span><span class="n">builder</span> \ |
| <span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">"Python Spark SQL basic example"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">config</span><span class="p">(</span><span class="s2">"spark.some.config.option"</span><span class="p">,</span> <span class="s2">"some-value"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="r"> |
| |
| <p>The entry point into all functionality in Spark is the <a href="api/R/sparkR.session.html"><code>SparkSession</code></a> class. To initialize a basic <code>SparkSession</code>, just call <code>sparkR.session()</code>:</p> |
| |
| <div class="highlight"><pre><span></span>sparkR.session<span class="p">(</span>appName <span class="o">=</span> <span class="s">"R Spark SQL basic example"</span><span class="p">,</span> sparkConfig <span class="o">=</span> <span class="kt">list</span><span class="p">(</span>spark.some.config.option <span class="o">=</span> <span class="s">"some-value"</span><span class="p">))</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div> |
| |
| <p>Note that when invoked for the first time, <code>sparkR.session()</code> initializes a global <code>SparkSession</code> singleton instance, and always returns a reference to this instance for successive invocations. In this way, users only need to initialize the <code>SparkSession</code> once, then SparkR functions like <code>read.df</code> will be able to access this global instance implicitly, and users don’t need to pass the <code>SparkSession</code> instance around.</p> |
| </div> |
| </div> |
| |
| <p><code>SparkSession</code> in Spark 2.0 provides builtin support for Hive features including the ability to |
| write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables. |
| To use these features, you do not need to have an existing Hive setup.</p> |
| |
| <h2 id="creating-dataframes">Creating DataFrames</h2> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <p>With a <code>SparkSession</code>, applications can create DataFrames from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>, |
| from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p> |
| |
| <p>As an example, the following creates a DataFrame based on the content of a JSON file:</p> |
| |
| <div class="highlight"><pre><span></span><span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">)</span> |
| |
| <span class="c1">// Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| <p>With a <code>SparkSession</code>, applications can create DataFrames from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>, |
| from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p> |
| |
| <p>As an example, the following creates a DataFrame based on the content of a JSON file:</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| |
| <span class="c1">// Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="python"> |
| <p>With a <code>SparkSession</code>, applications can create DataFrames from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>, |
| from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p> |
| |
| <p>As an example, the following creates a DataFrame based on the content of a JSON file:</p> |
| |
| <div class="highlight"><pre><span></span><span class="c1"># spark is an existing SparkSession</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="s2">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| <span class="c1"># Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># | age| name|</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># |null|Michael|</span> |
| <span class="c1"># | 30| Andy|</span> |
| <span class="c1"># | 19| Justin|</span> |
| <span class="c1"># +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="r"> |
| <p>With a <code>SparkSession</code>, applications can create DataFrames from a local R data.frame, |
| from a Hive table, or from <a href="sql-data-sources.html">Spark data sources</a>.</p> |
| |
| <p>As an example, the following creates a DataFrame based on the content of a JSON file:</p> |
| |
| <div class="highlight"><pre><span></span>df <span class="o"><-</span> read.json<span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c1"># Displays the content of the DataFrame</span> |
| <span class="kp">head</span><span class="p">(</span>df<span class="p">)</span> |
| <span class="c1">## age name</span> |
| <span class="c1">## 1 NA Michael</span> |
| <span class="c1">## 2 30 Andy</span> |
| <span class="c1">## 3 19 Justin</span> |
| |
| <span class="c1"># Another method to print the first few rows and optionally truncate the printing of long values</span> |
| showDF<span class="p">(</span>df<span class="p">)</span> |
| <span class="c1">## +----+-------+</span> |
| <span class="c1">## | age| name|</span> |
| <span class="c1">## +----+-------+</span> |
| <span class="c1">## |null|Michael|</span> |
| <span class="c1">## | 30| Andy|</span> |
| <span class="c1">## | 19| Justin|</span> |
| <span class="c1">## +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="untyped-dataset-operations-aka-dataframe-operations">Untyped Dataset Operations (aka DataFrame Operations)</h2> |
| |
| <p>DataFrames provide a domain-specific language for structured data manipulation in <a href="api/scala/index.html#org.apache.spark.sql.Dataset">Scala</a>, <a href="api/java/index.html?org/apache/spark/sql/Dataset.html">Java</a>, <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">Python</a> and <a href="api/R/SparkDataFrame.html">R</a>.</p> |
| |
| <p>As mentioned above, in Spark 2.0, DataFrames are just Dataset of <code>Row</code>s in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.</p> |
| |
| <p>Here we include some basic examples of structured data processing using Datasets:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <div class="highlight"><pre><span></span><span class="c1">// This import is needed to use the $-notation</span> |
| <span class="k">import</span> <span class="nn">spark.implicits._</span> |
| <span class="c1">// Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: long (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +-------+</span> |
| <span class="c1">// | name|</span> |
| <span class="c1">// +-------+</span> |
| <span class="c1">// |Michael|</span> |
| <span class="c1">// | Andy|</span> |
| <span class="c1">// | Justin|</span> |
| <span class="c1">// +-------+</span> |
| |
| <span class="c1">// Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="n">$</span><span class="s">"name"</span><span class="o">,</span> <span class="n">$</span><span class="s">"age"</span> <span class="o">+</span> <span class="mi">1</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +-------+---------+</span> |
| <span class="c1">// | name|(age + 1)|</span> |
| <span class="c1">// +-------+---------+</span> |
| <span class="c1">// |Michael| null|</span> |
| <span class="c1">// | Andy| 31|</span> |
| <span class="c1">// | Justin| 20|</span> |
| <span class="c1">// +-------+---------+</span> |
| |
| <span class="c1">// Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">$</span><span class="s">"age"</span> <span class="o">></span> <span class="mi">21</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +---+----+</span> |
| <span class="c1">// |age|name|</span> |
| <span class="c1">// +---+----+</span> |
| <span class="c1">// | 30|Andy|</span> |
| <span class="c1">// +---+----+</span> |
| |
| <span class="c1">// Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="n">count</span><span class="o">().</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+-----+</span> |
| <span class="c1">// | age|count|</span> |
| <span class="c1">// +----+-----+</span> |
| <span class="c1">// | 19| 1|</span> |
| <span class="c1">// |null| 1|</span> |
| <span class="c1">// | 30| 1|</span> |
| <span class="c1">// +----+-----+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a Dataset refer to the <a href="api/scala/index.html#org.apache.spark.sql.Dataset">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/scala/index.html#org.apache.spark.sql.functions$">DataFrame Function Reference</a>.</p> |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><span></span><span class="c1">// col("...") is preferable to df.col("...")</span> |
| <span class="kn">import static</span> <span class="nn">org.apache.spark.sql.functions.col</span><span class="o">;</span> |
| |
| <span class="c1">// Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: long (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +-------+</span> |
| <span class="c1">// | name|</span> |
| <span class="c1">// +-------+</span> |
| <span class="c1">// |Michael|</span> |
| <span class="c1">// | Andy|</span> |
| <span class="c1">// | Justin|</span> |
| <span class="c1">// +-------+</span> |
| |
| <span class="c1">// Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"name"</span><span class="o">),</span> <span class="n">col</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">plus</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +-------+---------+</span> |
| <span class="c1">// | name|(age + 1)|</span> |
| <span class="c1">// +-------+---------+</span> |
| <span class="c1">// |Michael| null|</span> |
| <span class="c1">// | Andy| 31|</span> |
| <span class="c1">// | Justin| 20|</span> |
| <span class="c1">// +-------+---------+</span> |
| |
| <span class="c1">// Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">gt</span><span class="o">(</span><span class="mi">21</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +---+----+</span> |
| <span class="c1">// |age|name|</span> |
| <span class="c1">// +---+----+</span> |
| <span class="c1">// | 30|Andy|</span> |
| <span class="c1">// +---+----+</span> |
| |
| <span class="c1">// Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">count</span><span class="o">().</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +----+-----+</span> |
| <span class="c1">// | age|count|</span> |
| <span class="c1">// +----+-----+</span> |
| <span class="c1">// | 19| 1|</span> |
| <span class="c1">// |null| 1|</span> |
| <span class="c1">// | 30| 1|</span> |
| <span class="c1">// +----+-----+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a Dataset refer to the <a href="api/java/org/apache/spark/sql/Dataset.html">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, Datasets also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/java/org/apache/spark/sql/functions.html">DataFrame Function Reference</a>.</p> |
| </div> |
| |
| <div data-lang="python"> |
| <p>In Python, it’s possible to access a DataFrame’s columns either by attribute |
| (<code>df.age</code>) or by indexing (<code>df['age']</code>). While the former is convenient for |
| interactive data exploration, users are highly encouraged to use the |
| latter form, which is future proof and won’t break with column names that |
| are also attributes on the DataFrame class.</p> |
| |
| <div class="highlight"><pre><span></span><span class="c1"># spark, df are from the previous example</span> |
| <span class="c1"># Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c1"># root</span> |
| <span class="c1"># |-- age: long (nullable = true)</span> |
| <span class="c1"># |-- name: string (nullable = true)</span> |
| |
| <span class="c1"># Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s2">"name"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-------+</span> |
| <span class="c1"># | name|</span> |
| <span class="c1"># +-------+</span> |
| <span class="c1"># |Michael|</span> |
| <span class="c1"># | Andy|</span> |
| <span class="c1"># | Justin|</span> |
| <span class="c1"># +-------+</span> |
| |
| <span class="c1"># Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">'name'</span><span class="p">],</span> <span class="n">df</span><span class="p">[</span><span class="s1">'age'</span><span class="p">]</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-------+---------+</span> |
| <span class="c1"># | name|(age + 1)|</span> |
| <span class="c1"># +-------+---------+</span> |
| <span class="c1"># |Michael| null|</span> |
| <span class="c1"># | Andy| 31|</span> |
| <span class="c1"># | Justin| 20|</span> |
| <span class="c1"># +-------+---------+</span> |
| |
| <span class="c1"># Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s1">'age'</span><span class="p">]</span> <span class="o">></span> <span class="mi">21</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +---+----+</span> |
| <span class="c1"># |age|name|</span> |
| <span class="c1"># +---+----+</span> |
| <span class="c1"># | 30|Andy|</span> |
| <span class="c1"># +---+----+</span> |
| |
| <span class="c1"># Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="p">(</span><span class="s2">"age"</span><span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +----+-----+</span> |
| <span class="c1"># | age|count|</span> |
| <span class="c1"># +----+-----+</span> |
| <span class="c1"># | 19| 1|</span> |
| <span class="c1"># |null| 1|</span> |
| <span class="c1"># | 30| 1|</span> |
| <span class="c1"># +----+-----+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| <p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/python/pyspark.sql.html#module-pyspark.sql.functions">DataFrame Function Reference</a>.</p> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><span></span><span class="c1"># Create the DataFrame</span> |
| df <span class="o"><-</span> read.json<span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c1"># Show the content of the DataFrame</span> |
| <span class="kp">head</span><span class="p">(</span>df<span class="p">)</span> |
| <span class="c1">## age name</span> |
| <span class="c1">## 1 NA Michael</span> |
| <span class="c1">## 2 30 Andy</span> |
| <span class="c1">## 3 19 Justin</span> |
| |
| |
| <span class="c1"># Print the schema in a tree format</span> |
| printSchema<span class="p">(</span>df<span class="p">)</span> |
| <span class="c1">## root</span> |
| <span class="c1">## |-- age: long (nullable = true)</span> |
| <span class="c1">## |-- name: string (nullable = true)</span> |
| |
| <span class="c1"># Select only the "name" column</span> |
| <span class="kp">head</span><span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> <span class="s">"name"</span><span class="p">))</span> |
| <span class="c1">## name</span> |
| <span class="c1">## 1 Michael</span> |
| <span class="c1">## 2 Andy</span> |
| <span class="c1">## 3 Justin</span> |
| |
| <span class="c1"># Select everybody, but increment the age by 1</span> |
| <span class="kp">head</span><span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> df<span class="o">$</span>name<span class="p">,</span> df<span class="o">$</span>age <span class="o">+</span> <span class="m">1</span><span class="p">))</span> |
| <span class="c1">## name (age + 1.0)</span> |
| <span class="c1">## 1 Michael NA</span> |
| <span class="c1">## 2 Andy 31</span> |
| <span class="c1">## 3 Justin 20</span> |
| |
| <span class="c1"># Select people older than 21</span> |
| <span class="kp">head</span><span class="p">(</span>where<span class="p">(</span>df<span class="p">,</span> df<span class="o">$</span>age <span class="o">></span> <span class="m">21</span><span class="p">))</span> |
| <span class="c1">## age name</span> |
| <span class="c1">## 1 30 Andy</span> |
| |
| <span class="c1"># Count people by age</span> |
| <span class="kp">head</span><span class="p">(</span>count<span class="p">(</span>groupBy<span class="p">(</span>df<span class="p">,</span> <span class="s">"age"</span><span class="p">)))</span> |
| <span class="c1">## age count</span> |
| <span class="c1">## 1 19 1</span> |
| <span class="c1">## 2 NA 1</span> |
| <span class="c1">## 3 30 1</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/R/index.html">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/R/SparkDataFrame.html">DataFrame Function Reference</a>.</p> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="running-sql-queries-programmatically">Running SQL Queries Programmatically</h2> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <p>The <code>sql</code> function on a <code>SparkSession</code> enables applications to run SQL queries programmatically and returns the result as a <code>DataFrame</code>.</p> |
| |
| <div class="highlight"><pre><span></span><span class="c1">// Register the DataFrame as a SQL temporary view</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="k">val</span> <span class="n">sqlDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT * FROM people"</span><span class="o">)</span> |
| <span class="n">sqlDF</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| <p>The <code>sql</code> function on a <code>SparkSession</code> enables applications to run SQL queries programmatically and returns the result as a <code>Dataset<Row></code>.</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| |
| <span class="c1">// Register the DataFrame as a SQL temporary view</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">sqlDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT * FROM people"</span><span class="o">);</span> |
| <span class="n">sqlDF</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="python"> |
| <p>The <code>sql</code> function on a <code>SparkSession</code> enables applications to run SQL queries programmatically and returns the result as a <code>DataFrame</code>.</p> |
| |
| <div class="highlight"><pre><span></span><span class="c1"># Register the DataFrame as a SQL temporary view</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">"people"</span><span class="p">)</span> |
| |
| <span class="n">sqlDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT * FROM people"</span><span class="p">)</span> |
| <span class="n">sqlDF</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># | age| name|</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># |null|Michael|</span> |
| <span class="c1"># | 30| Andy|</span> |
| <span class="c1"># | 19| Justin|</span> |
| <span class="c1"># +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="r"> |
| <p>The <code>sql</code> function enables applications to run SQL queries programmatically and returns the result as a <code>SparkDataFrame</code>.</p> |
| |
| <div class="highlight"><pre><span></span>df <span class="o"><-</span> sql<span class="p">(</span><span class="s">"SELECT * FROM table"</span><span class="p">)</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="global-temporary-view">Global Temporary View</h2> |
| |
| <p>Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it |
| terminates. If you want to have a temporary view that is shared among all sessions and keep alive |
| until the Spark application terminates, you can create a global temporary view. Global temporary |
| view is tied to a system preserved database <code>global_temp</code>, and we must use the qualified name to |
| refer it, e.g. <code>SELECT * FROM global_temp.view1</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <div class="highlight"><pre><span></span><span class="c1">// Register the DataFrame as a global temporary view</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">createGlobalTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// Global temporary view is tied to a system preserved database `global_temp`</span> |
| <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT * FROM global_temp.people"</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| |
| <span class="c1">// Global temporary view is cross-session</span> |
| <span class="n">spark</span><span class="o">.</span><span class="n">newSession</span><span class="o">().</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT * FROM global_temp.people"</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| <div class="highlight"><pre><span></span><span class="c1">// Register the DataFrame as a global temporary view</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">createGlobalTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// Global temporary view is tied to a system preserved database `global_temp`</span> |
| <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT * FROM global_temp.people"</span><span class="o">).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| |
| <span class="c1">// Global temporary view is cross-session</span> |
| <span class="n">spark</span><span class="o">.</span><span class="na">newSession</span><span class="o">().</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT * FROM global_temp.people"</span><span class="o">).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="python"> |
| <div class="highlight"><pre><span></span><span class="c1"># Register the DataFrame as a global temporary view</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">createGlobalTempView</span><span class="p">(</span><span class="s2">"people"</span><span class="p">)</span> |
| |
| <span class="c1"># Global temporary view is tied to a system preserved database `global_temp`</span> |
| <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT * FROM global_temp.people"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># | age| name|</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># |null|Michael|</span> |
| <span class="c1"># | 30| Andy|</span> |
| <span class="c1"># | 19| Justin|</span> |
| <span class="c1"># +----+-------+</span> |
| |
| <span class="c1"># Global temporary view is cross-session</span> |
| <span class="n">spark</span><span class="o">.</span><span class="n">newSession</span><span class="p">()</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT * FROM global_temp.people"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># | age| name|</span> |
| <span class="c1"># +----+-------+</span> |
| <span class="c1"># |null|Michael|</span> |
| <span class="c1"># | 30| Andy|</span> |
| <span class="c1"># | 19| Justin|</span> |
| <span class="c1"># +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span></span><span class="k">CREATE</span> <span class="k">GLOBAL</span> <span class="k">TEMPORARY</span> <span class="k">VIEW</span> <span class="n">temp_view</span> <span class="k">AS</span> <span class="k">SELECT</span> <span class="n">a</span> <span class="o">+</span> <span class="mi">1</span><span class="p">,</span> <span class="n">b</span> <span class="o">*</span> <span class="mi">2</span> <span class="k">FROM</span> <span class="n">tbl</span> |
| |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">global_temp</span><span class="p">.</span><span class="n">temp_view</span></code></pre></figure> |
| |
| </div> |
| </div> |
| |
| <h2 id="creating-datasets">Creating Datasets</h2> |
| |
| <p>Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use |
| a specialized <a href="api/scala/index.html#org.apache.spark.sql.Encoder">Encoder</a> to serialize the objects |
| for processing or transmitting over the network. While both encoders and standard serialization are |
| responsible for turning an object into bytes, encoders are code generated dynamically and use a format |
| that allows Spark to perform many operations like filtering, sorting and hashing without deserializing |
| the bytes back into an object.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <div class="highlight"><pre><span></span><span class="k">case</span> <span class="k">class</span> <span class="nc">Person</span><span class="o">(</span><span class="n">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">age</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span> |
| |
| <span class="c1">// Encoders are created for case classes</span> |
| <span class="k">val</span> <span class="n">caseClassDS</span> <span class="k">=</span> <span class="nc">Seq</span><span class="o">(</span><span class="nc">Person</span><span class="o">(</span><span class="s">"Andy"</span><span class="o">,</span> <span class="mi">32</span><span class="o">)).</span><span class="n">toDS</span><span class="o">()</span> |
| <span class="n">caseClassDS</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+---+</span> |
| <span class="c1">// |name|age|</span> |
| <span class="c1">// +----+---+</span> |
| <span class="c1">// |Andy| 32|</span> |
| <span class="c1">// +----+---+</span> |
| |
| <span class="c1">// Encoders for most common types are automatically provided by importing spark.implicits._</span> |
| <span class="k">val</span> <span class="n">primitiveDS</span> <span class="k">=</span> <span class="nc">Seq</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">).</span><span class="n">toDS</span><span class="o">()</span> |
| <span class="n">primitiveDS</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span> <span class="o">+</span> <span class="mi">1</span><span class="o">).</span><span class="n">collect</span><span class="o">()</span> <span class="c1">// Returns: Array(2, 3, 4)</span> |
| |
| <span class="c1">// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name</span> |
| <span class="k">val</span> <span class="n">path</span> <span class="k">=</span> <span class="s">"examples/src/main/resources/people.json"</span> |
| <span class="k">val</span> <span class="n">peopleDS</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="n">path</span><span class="o">).</span><span class="n">as</span><span class="o">[</span><span class="kt">Person</span><span class="o">]</span> |
| <span class="n">peopleDS</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.Arrays</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">java.util.Collections</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">java.io.Serializable</span><span class="o">;</span> |
| |
| <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.MapFunction</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Person</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kt">int</span> <span class="n">age</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">getName</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">name</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setName</span><span class="o">(</span><span class="n">String</span> <span class="n">name</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">name</span> <span class="o">=</span> <span class="n">name</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">int</span> <span class="nf">getAge</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">age</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setAge</span><span class="o">(</span><span class="kt">int</span> <span class="n">age</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">age</span> <span class="o">=</span> <span class="n">age</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// Create an instance of a Bean class</span> |
| <span class="n">Person</span> <span class="n">person</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Person</span><span class="o">();</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="s">"Andy"</span><span class="o">);</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setAge</span><span class="o">(</span><span class="mi">32</span><span class="o">);</span> |
| |
| <span class="c1">// Encoders are created for Java beans</span> |
| <span class="n">Encoder</span><span class="o"><</span><span class="n">Person</span><span class="o">></span> <span class="n">personEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">bean</span><span class="o">(</span><span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Person</span><span class="o">></span> <span class="n">javaBeanDS</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataset</span><span class="o">(</span> |
| <span class="n">Collections</span><span class="o">.</span><span class="na">singletonList</span><span class="o">(</span><span class="n">person</span><span class="o">),</span> |
| <span class="n">personEncoder</span> |
| <span class="o">);</span> |
| <span class="n">javaBeanDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +---+----+</span> |
| <span class="c1">// |age|name|</span> |
| <span class="c1">// +---+----+</span> |
| <span class="c1">// | 32|Andy|</span> |
| <span class="c1">// +---+----+</span> |
| |
| <span class="c1">// Encoders for most common types are provided in class Encoders</span> |
| <span class="n">Encoder</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">integerEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">INT</span><span class="o">();</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">primitiveDS</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataset</span><span class="o">(</span><span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">),</span> <span class="n">integerEncoder</span><span class="o">);</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">transformedDS</span> <span class="o">=</span> <span class="n">primitiveDS</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="o">(</span><span class="n">MapFunction</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>)</span> <span class="n">value</span> <span class="o">-></span> <span class="n">value</span> <span class="o">+</span> <span class="mi">1</span><span class="o">,</span> |
| <span class="n">integerEncoder</span><span class="o">);</span> |
| <span class="n">transformedDS</span><span class="o">.</span><span class="na">collect</span><span class="o">();</span> <span class="c1">// Returns [2, 3, 4]</span> |
| |
| <span class="c1">// DataFrames can be converted to a Dataset by providing a class. Mapping based on name</span> |
| <span class="n">String</span> <span class="n">path</span> <span class="o">=</span> <span class="s">"examples/src/main/resources/people.json"</span><span class="o">;</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Person</span><span class="o">></span> <span class="n">peopleDS</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="n">path</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="n">personEncoder</span><span class="o">);</span> |
| <span class="n">peopleDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// | age| name|</span> |
| <span class="c1">// +----+-------+</span> |
| <span class="c1">// |null|Michael|</span> |
| <span class="c1">// | 30| Andy|</span> |
| <span class="c1">// | 19| Justin|</span> |
| <span class="c1">// +----+-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <h2 id="interoperating-with-rdds">Interoperating with RDDs</h2> |
| |
| <p>Spark SQL supports two different methods for converting existing RDDs into Datasets. The first |
| method uses reflection to infer the schema of an RDD that contains specific types of objects. This |
| reflection-based approach leads to more concise code and works well when you already know the schema |
| while writing your Spark application.</p> |
| |
| <p>The second method for creating Datasets is through a programmatic interface that allows you to |
| construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows |
| you to construct Datasets when the columns and their types are not known until runtime.</p> |
| |
| <h3 id="inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</h3> |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>The Scala interface for Spark SQL supports automatically converting an RDD containing case classes |
| to a DataFrame. The case class |
| defines the schema of the table. The names of the arguments to the case class are read using |
| reflection and become the names of the columns. Case classes can also be nested or contain complex |
| types such as <code>Seq</code>s or <code>Array</code>s. This RDD can be implicitly converted to a DataFrame and then be |
| registered as a table. Tables can be used in subsequent SQL statements.</p> |
| |
| <div class="highlight"><pre><span></span><span class="c1">// For implicit conversions from RDDs to DataFrames</span> |
| <span class="k">import</span> <span class="nn">spark.implicits._</span> |
| |
| <span class="c1">// Create an RDD of Person objects from a text file, convert it to a Dataframe</span> |
| <span class="k">val</span> <span class="n">peopleDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span> |
| <span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">))</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=></span> <span class="nc">Person</span><span class="o">(</span><span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">.</span><span class="n">toInt</span><span class="o">))</span> |
| <span class="o">.</span><span class="n">toDF</span><span class="o">()</span> |
| <span class="c1">// Register the DataFrame as a temporary view</span> |
| <span class="n">peopleDF</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by Spark</span> |
| <span class="k">val</span> <span class="n">teenagersDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name, age FROM people WHERE age BETWEEN 13 AND 19"</span><span class="o">)</span> |
| |
| <span class="c1">// The columns of a row in the result can be accessed by field index</span> |
| <span class="n">teenagersDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">teenager</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">teenager</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// | value|</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// |Name: Justin|</span> |
| <span class="c1">// +------------+</span> |
| |
| <span class="c1">// or by field name</span> |
| <span class="n">teenagersDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">teenager</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">teenager</span><span class="o">.</span><span class="n">getAs</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">"name"</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// | value|</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// |Name: Justin|</span> |
| <span class="c1">// +------------+</span> |
| |
| <span class="c1">// No pre-defined encoders for Dataset[Map[K,V]], define explicitly</span> |
| <span class="k">implicit</span> <span class="k">val</span> <span class="n">mapEncoder</span> <span class="k">=</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">Encoders</span><span class="o">.</span><span class="n">kryo</span><span class="o">[</span><span class="kt">Map</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Any</span><span class="o">]]</span> |
| <span class="c1">// Primitive types and case classes can be also defined as</span> |
| <span class="c1">// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()</span> |
| |
| <span class="c1">// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]</span> |
| <span class="n">teenagersDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">teenager</span> <span class="k">=></span> <span class="n">teenager</span><span class="o">.</span><span class="n">getValuesMap</span><span class="o">[</span><span class="kt">Any</span><span class="o">](</span><span class="nc">List</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">))).</span><span class="n">collect</span><span class="o">()</span> |
| <span class="c1">// Array(Map("name" -> "Justin", "age" -> 19))</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Spark SQL supports automatically converting an RDD of |
| <a href="http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly">JavaBeans</a> into a DataFrame. |
| The <code>BeanInfo</code>, obtained using reflection, defines the schema of the table. Currently, Spark SQL |
| does not support JavaBeans that contain <code>Map</code> field(s). Nested JavaBeans and <code>List</code> or <code>Array</code> |
| fields are supported though. You can create a JavaBean by creating a class that implements |
| Serializable and has getters and setters for all of its fields.</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.api.java.JavaRDD</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.Function</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.MapFunction</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span> |
| |
| <span class="c1">// Create an RDD of Person objects from a text file</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Person</span><span class="o">></span> <span class="n">peopleRDD</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">javaRDD</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">line</span> <span class="o">-></span> <span class="o">{</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">parts</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> |
| <span class="n">Person</span> <span class="n">person</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Person</span><span class="o">();</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">0</span><span class="o">]);</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setAge</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">()));</span> |
| <span class="k">return</span> <span class="n">person</span><span class="o">;</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Apply a schema to an RDD of JavaBeans to get a DataFrame</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">peopleDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">peopleRDD</span><span class="o">,</span> <span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="c1">// Register the DataFrame as a temporary view</span> |
| <span class="n">peopleDF</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by spark</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">teenagersDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age BETWEEN 13 AND 19"</span><span class="o">);</span> |
| |
| <span class="c1">// The columns of a row in the result can be accessed by field index</span> |
| <span class="n">Encoder</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">stringEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">STRING</span><span class="o">();</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">teenagerNamesByIndexDF</span> <span class="o">=</span> <span class="n">teenagersDF</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="o">(</span><span class="n">MapFunction</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>)</span> <span class="n">row</span> <span class="o">-></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> |
| <span class="n">stringEncoder</span><span class="o">);</span> |
| <span class="n">teenagerNamesByIndexDF</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// | value|</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// |Name: Justin|</span> |
| <span class="c1">// +------------+</span> |
| |
| <span class="c1">// or by field name</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">teenagerNamesByFieldDF</span> <span class="o">=</span> <span class="n">teenagersDF</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="o">(</span><span class="n">MapFunction</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>)</span> <span class="n">row</span> <span class="o">-></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.<</span><span class="n">String</span><span class="o">></span><span class="n">getAs</span><span class="o">(</span><span class="s">"name"</span><span class="o">),</span> |
| <span class="n">stringEncoder</span><span class="o">);</span> |
| <span class="n">teenagerNamesByFieldDF</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// | value|</span> |
| <span class="c1">// +------------+</span> |
| <span class="c1">// |Name: Justin|</span> |
| <span class="c1">// +------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of |
| key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, |
| and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Row</span> |
| |
| <span class="n">sc</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span> |
| |
| <span class="c1"># Load a text file and convert each line to a Row.</span> |
| <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s2">"examples/src/main/resources/people.txt"</span><span class="p">)</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">","</span><span class="p">))</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">age</span><span class="o">=</span><span class="nb">int</span><span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">])))</span> |
| |
| <span class="c1"># Infer the schema, and register the DataFrame as a table.</span> |
| <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">)</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">"people"</span><span class="p">)</span> |
| |
| <span class="c1"># SQL can be run over DataFrames that have been registered as a table.</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| |
| <span class="c1"># The results of SQL queries are Dataframe objects.</span> |
| <span class="c1"># rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.</span> |
| <span class="n">teenNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s2">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">teenNames</span><span class="p">:</span> |
| <span class="k">print</span><span class="p">(</span><span class="n">name</span><span class="p">)</span> |
| <span class="c1"># Name: Justin</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| </div> |
| |
| </div> |
| |
| <h3 id="programmatically-specifying-the-schema">Programmatically Specifying the Schema</h3> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>When case classes cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed |
| and fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of <code>Row</code>s from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| <code>Row</code>s in the RDD created in Step 1.</li> |
| <li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided |
| by <code>SparkSession</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span> |
| |
| <span class="c1">// Create an RDD</span> |
| <span class="k">val</span> <span class="n">peopleRDD</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">)</span> |
| |
| <span class="c1">// The schema is encoded in a string</span> |
| <span class="k">val</span> <span class="n">schemaString</span> <span class="k">=</span> <span class="s">"name age"</span> |
| |
| <span class="c1">// Generate the schema based on the string of schema</span> |
| <span class="k">val</span> <span class="n">fields</span> <span class="k">=</span> <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">fieldName</span> <span class="k">=></span> <span class="nc">StructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">,</span> <span class="n">nullable</span> <span class="k">=</span> <span class="kc">true</span><span class="o">))</span> |
| <span class="k">val</span> <span class="n">schema</span> <span class="k">=</span> <span class="nc">StructType</span><span class="o">(</span><span class="n">fields</span><span class="o">)</span> |
| |
| <span class="c1">// Convert records of the RDD (people) to Rows</span> |
| <span class="k">val</span> <span class="n">rowRDD</span> <span class="k">=</span> <span class="n">peopleRDD</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">))</span> |
| <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=></span> <span class="nc">Row</span><span class="o">(</span><span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">))</span> |
| |
| <span class="c1">// Apply the schema to the RDD</span> |
| <span class="k">val</span> <span class="n">peopleDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">)</span> |
| |
| <span class="c1">// Creates a temporary view using the DataFrame</span> |
| <span class="n">peopleDF</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL can be run over a temporary view created using DataFrames</span> |
| <span class="k">val</span> <span class="n">results</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations</span> |
| <span class="c1">// The columns of a row in the result can be accessed by field index or by field name</span> |
| <span class="n">results</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +-------------+</span> |
| <span class="c1">// | value|</span> |
| <span class="c1">// +-------------+</span> |
| <span class="c1">// |Name: Michael|</span> |
| <span class="c1">// | Name: Andy|</span> |
| <span class="c1">// | Name: Justin|</span> |
| <span class="c1">// +-------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>When JavaBean classes cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed and |
| fields will be projected differently for different users), |
| a <code>Dataset<Row></code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of <code>Row</code>s from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| <code>Row</code>s in the RDD created in Step 1.</li> |
| <li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided |
| by <code>SparkSession</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.ArrayList</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span> |
| |
| <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.JavaRDD</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.Function</span><span class="o">;</span> |
| |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataTypes</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructField</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructType</span><span class="o">;</span> |
| |
| <span class="c1">// Create an RDD</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">peopleRDD</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sparkContext</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">toJavaRDD</span><span class="o">();</span> |
| |
| <span class="c1">// The schema is encoded in a string</span> |
| <span class="n">String</span> <span class="n">schemaString</span> <span class="o">=</span> <span class="s">"name age"</span><span class="o">;</span> |
| |
| <span class="c1">// Generate the schema based on the string of schema</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">StructField</span><span class="o">></span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><>();</span> |
| <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">fieldName</span> <span class="o">:</span> <span class="n">schemaString</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> <span class="o">{</span> |
| <span class="n">StructField</span> <span class="n">field</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">StringType</span><span class="o">,</span> <span class="kc">true</span><span class="o">);</span> |
| <span class="n">fields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">field</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="n">StructType</span> <span class="n">schema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">fields</span><span class="o">);</span> |
| |
| <span class="c1">// Convert records of the RDD (people) to Rows</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">rowRDD</span> <span class="o">=</span> <span class="n">peopleRDD</span><span class="o">.</span><span class="na">map</span><span class="o">((</span><span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Row</span><span class="o">>)</span> <span class="n">record</span> <span class="o">-></span> <span class="o">{</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">attributes</span> <span class="o">=</span> <span class="n">record</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> |
| <span class="k">return</span> <span class="n">RowFactory</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">attributes</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">attributes</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">());</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Apply the schema to the RDD</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">peopleDataFrame</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">);</span> |
| |
| <span class="c1">// Creates a temporary view using the DataFrame</span> |
| <span class="n">peopleDataFrame</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL can be run over a temporary view created using DataFrames</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">results</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people"</span><span class="o">);</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations</span> |
| <span class="c1">// The columns of a row in the result can be accessed by field index or by field name</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">namesDS</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="o">(</span><span class="n">MapFunction</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>)</span> <span class="n">row</span> <span class="o">-></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> |
| <span class="n">Encoders</span><span class="o">.</span><span class="na">STRING</span><span class="o">());</span> |
| <span class="n">namesDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +-------------+</span> |
| <span class="c1">// | value|</span> |
| <span class="c1">// +-------------+</span> |
| <span class="c1">// |Name: Michael|</span> |
| <span class="c1">// | Name: Andy|</span> |
| <span class="c1">// | Name: Justin|</span> |
| <span class="c1">// +-------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" in the Spark repo.</small></div> |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>When a dictionary of kwargs cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed and |
| fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of tuples or lists from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| tuples or lists in the RDD created in the step 1.</li> |
| <li>Apply the schema to the RDD via <code>createDataFrame</code> method provided by <code>SparkSession</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><span></span><span class="c1"># Import data types</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="o">*</span> |
| |
| <span class="n">sc</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span> |
| |
| <span class="c1"># Load a text file and convert each line to a Row.</span> |
| <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s2">"examples/src/main/resources/people.txt"</span><span class="p">)</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s2">","</span><span class="p">))</span> |
| <span class="c1"># Each line is converted to a tuple.</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">strip</span><span class="p">()))</span> |
| |
| <span class="c1"># The schema is encoded in a string.</span> |
| <span class="n">schemaString</span> <span class="o">=</span> <span class="s2">"name age"</span> |
| |
| <span class="n">fields</span> <span class="o">=</span> <span class="p">[</span><span class="n">StructField</span><span class="p">(</span><span class="n">field_name</span><span class="p">,</span> <span class="n">StringType</span><span class="p">(),</span> <span class="bp">True</span><span class="p">)</span> <span class="k">for</span> <span class="n">field_name</span> <span class="ow">in</span> <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="p">()]</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span><span class="n">fields</span><span class="p">)</span> |
| |
| <span class="c1"># Apply the schema to the RDD.</span> |
| <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">,</span> <span class="n">schema</span><span class="p">)</span> |
| |
| <span class="c1"># Creates a temporary view using the DataFrame</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">"people"</span><span class="p">)</span> |
| |
| <span class="c1"># SQL can be run over DataFrames that have been registered as a table.</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">"SELECT name FROM people"</span><span class="p">)</span> |
| |
| <span class="n">results</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c1"># +-------+</span> |
| <span class="c1"># | name|</span> |
| <span class="c1"># +-------+</span> |
| <span class="c1"># |Michael|</span> |
| <span class="c1"># | Andy|</span> |
| <span class="c1"># | Justin|</span> |
| <span class="c1"># +-------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.</small></div> |
| </div> |
| |
| </div> |
| |
| <h2 id="aggregations">Aggregations</h2> |
| |
| <p>The <a href="api/scala/index.html#org.apache.spark.sql.functions$">built-in DataFrames functions</a> provide common |
| aggregations such as <code>count()</code>, <code>countDistinct()</code>, <code>avg()</code>, <code>max()</code>, <code>min()</code>, etc. |
| While those functions are designed for DataFrames, Spark SQL also has type-safe versions for some of them in |
| <a href="api/scala/index.html#org.apache.spark.sql.expressions.scalalang.typed$">Scala</a> and |
| <a href="api/java/org/apache/spark/sql/expressions/javalang/typed.html">Java</a> to work with strongly typed Datasets. |
| Moreover, users are not limited to the predefined aggregate functions and can create their own.</p> |
| |
| <h3 id="untyped-user-defined-aggregate-functions">Untyped User-Defined Aggregate Functions</h3> |
| <p>Users have to extend the <a href="api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction">UserDefinedAggregateFunction</a> |
| abstract class to implement a custom untyped aggregate function. For example, a user-defined average |
| can look like:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.</span><span class="o">{</span><span class="nc">Row</span><span class="o">,</span> <span class="nc">SparkSession</span><span class="o">}</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql.expressions.MutableAggregationBuffer</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql.expressions.UserDefinedAggregateFunction</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span> |
| |
| <span class="k">object</span> <span class="nc">MyAverage</span> <span class="k">extends</span> <span class="nc">UserDefinedAggregateFunction</span> <span class="o">{</span> |
| <span class="c1">// Data types of input arguments of this aggregate function</span> |
| <span class="k">def</span> <span class="n">inputSchema</span><span class="k">:</span> <span class="kt">StructType</span> <span class="o">=</span> <span class="nc">StructType</span><span class="o">(</span><span class="nc">StructField</span><span class="o">(</span><span class="s">"inputColumn"</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span> <span class="o">::</span> <span class="nc">Nil</span><span class="o">)</span> |
| <span class="c1">// Data types of values in the aggregation buffer</span> |
| <span class="k">def</span> <span class="n">bufferSchema</span><span class="k">:</span> <span class="kt">StructType</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="nc">StructType</span><span class="o">(</span><span class="nc">StructField</span><span class="o">(</span><span class="s">"sum"</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span> <span class="o">::</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">"count"</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span> <span class="o">::</span> <span class="nc">Nil</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="c1">// The data type of the returned value</span> |
| <span class="k">def</span> <span class="n">dataType</span><span class="k">:</span> <span class="kt">DataType</span> <span class="o">=</span> <span class="nc">DoubleType</span> |
| <span class="c1">// Whether this function always returns the same output on the identical input</span> |
| <span class="k">def</span> <span class="n">deterministic</span><span class="k">:</span> <span class="kt">Boolean</span> <span class="o">=</span> <span class="kc">true</span> |
| <span class="c1">// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to</span> |
| <span class="c1">// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides</span> |
| <span class="c1">// the opportunity to update its values. Note that arrays and maps inside the buffer are still</span> |
| <span class="c1">// immutable.</span> |
| <span class="k">def</span> <span class="n">initialize</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">MutableAggregationBuffer</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">buffer</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="k">=</span> <span class="mi">0L</span> |
| <span class="n">buffer</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="k">=</span> <span class="mi">0L</span> |
| <span class="o">}</span> |
| <span class="c1">// Updates the given aggregation buffer `buffer` with new input data from `input`</span> |
| <span class="k">def</span> <span class="n">update</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">MutableAggregationBuffer</span><span class="o">,</span> <span class="n">input</span><span class="k">:</span> <span class="kt">Row</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">if</span> <span class="o">(!</span><span class="n">input</span><span class="o">.</span><span class="n">isNullAt</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">{</span> |
| <span class="n">buffer</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">input</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="n">buffer</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="c1">// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`</span> |
| <span class="k">def</span> <span class="n">merge</span><span class="o">(</span><span class="n">buffer1</span><span class="k">:</span> <span class="kt">MutableAggregationBuffer</span><span class="o">,</span> <span class="n">buffer2</span><span class="k">:</span> <span class="kt">Row</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">buffer1</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> |
| <span class="n">buffer1</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="k">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="c1">// Calculates the final result</span> |
| <span class="k">def</span> <span class="n">evaluate</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">Row</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">toDouble</span> <span class="o">/</span> <span class="n">buffer</span><span class="o">.</span><span class="n">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// Register the function to access it</span> |
| <span class="n">spark</span><span class="o">.</span><span class="n">udf</span><span class="o">.</span><span class="n">register</span><span class="o">(</span><span class="s">"myAverage"</span><span class="o">,</span> <span class="nc">MyAverage</span><span class="o">)</span> |
| |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/employees.json"</span><span class="o">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">"employees"</span><span class="o">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// | name|salary|</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// |Michael| 3000|</span> |
| <span class="c1">// | Andy| 4500|</span> |
| <span class="c1">// | Justin| 3500|</span> |
| <span class="c1">// | Berta| 4000|</span> |
| <span class="c1">// +-------+------+</span> |
| |
| <span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT myAverage(salary) as average_salary FROM employees"</span><span class="o">)</span> |
| <span class="n">result</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// |average_salary|</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// | 3750.0|</span> |
| <span class="c1">// +--------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.</small></div> |
| </div> |
| <div data-lang="java"> |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.util.ArrayList</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span> |
| |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.expressions.MutableAggregationBuffer</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.expressions.UserDefinedAggregateFunction</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataType</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataTypes</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructField</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructType</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyAverage</span> <span class="kd">extends</span> <span class="n">UserDefinedAggregateFunction</span> <span class="o">{</span> |
| |
| <span class="kd">private</span> <span class="n">StructType</span> <span class="n">inputSchema</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="n">StructType</span> <span class="n">bufferSchema</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="nf">MyAverage</span><span class="o">()</span> <span class="o">{</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">StructField</span><span class="o">></span> <span class="n">inputFields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><>();</span> |
| <span class="n">inputFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">"inputColumn"</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span> |
| <span class="n">inputSchema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">inputFields</span><span class="o">);</span> |
| |
| <span class="n">List</span><span class="o"><</span><span class="n">StructField</span><span class="o">></span> <span class="n">bufferFields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><>();</span> |
| <span class="n">bufferFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">"sum"</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span> |
| <span class="n">bufferFields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="s">"count"</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">LongType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span> |
| <span class="n">bufferSchema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">bufferFields</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="c1">// Data types of input arguments of this aggregate function</span> |
| <span class="kd">public</span> <span class="n">StructType</span> <span class="nf">inputSchema</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">inputSchema</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="c1">// Data types of values in the aggregation buffer</span> |
| <span class="kd">public</span> <span class="n">StructType</span> <span class="nf">bufferSchema</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">bufferSchema</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="c1">// The data type of the returned value</span> |
| <span class="kd">public</span> <span class="n">DataType</span> <span class="nf">dataType</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">DoubleType</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="c1">// Whether this function always returns the same output on the identical input</span> |
| <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">deterministic</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="kc">true</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="c1">// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to</span> |
| <span class="c1">// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides</span> |
| <span class="c1">// the opportunity to update its values. Note that arrays and maps inside the buffer are still</span> |
| <span class="c1">// immutable.</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">initialize</span><span class="o">(</span><span class="n">MutableAggregationBuffer</span> <span class="n">buffer</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">0</span><span class="n">L</span><span class="o">);</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">0</span><span class="n">L</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="c1">// Updates the given aggregation buffer `buffer` with new input data from `input`</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">update</span><span class="o">(</span><span class="n">MutableAggregationBuffer</span> <span class="n">buffer</span><span class="o">,</span> <span class="n">Row</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">if</span> <span class="o">(!</span><span class="n">input</span><span class="o">.</span><span class="na">isNullAt</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">{</span> |
| <span class="kt">long</span> <span class="n">updatedSum</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">input</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="kt">long</span> <span class="n">updatedCount</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">updatedSum</span><span class="o">);</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">updatedCount</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="c1">// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">merge</span><span class="o">(</span><span class="n">MutableAggregationBuffer</span> <span class="n">buffer1</span><span class="o">,</span> <span class="n">Row</span> <span class="n">buffer2</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kt">long</span> <span class="n">mergedSum</span> <span class="o">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="kt">long</span> <span class="n">mergedCount</span> <span class="o">=</span> <span class="n">buffer1</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span> <span class="o">+</span> <span class="n">buffer2</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> |
| <span class="n">buffer1</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">mergedSum</span><span class="o">);</span> |
| <span class="n">buffer1</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">mergedCount</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="c1">// Calculates the final result</span> |
| <span class="kd">public</span> <span class="n">Double</span> <span class="nf">evaluate</span><span class="o">(</span><span class="n">Row</span> <span class="n">buffer</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">/</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// Register the function to access it</span> |
| <span class="n">spark</span><span class="o">.</span><span class="na">udf</span><span class="o">().</span><span class="na">register</span><span class="o">(</span><span class="s">"myAverage"</span><span class="o">,</span> <span class="k">new</span> <span class="n">MyAverage</span><span class="o">());</span> |
| |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/employees.json"</span><span class="o">);</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">"employees"</span><span class="o">);</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// | name|salary|</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// |Michael| 3000|</span> |
| <span class="c1">// | Andy| 4500|</span> |
| <span class="c1">// | Justin| 3500|</span> |
| <span class="c1">// | Berta| 4000|</span> |
| <span class="c1">// +-------+------+</span> |
| |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">result</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT myAverage(salary) as average_salary FROM employees"</span><span class="o">);</span> |
| <span class="n">result</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// |average_salary|</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// | 3750.0|</span> |
| <span class="c1">// +--------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedUntypedAggregation.java" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| <h3 id="type-safe-user-defined-aggregate-functions">Type-Safe User-Defined Aggregate Functions</h3> |
| |
| <p>User-defined aggregations for strongly typed Datasets revolve around the <a href="api/scala/index.html#org.apache.spark.sql.expressions.Aggregator">Aggregator</a> abstract class. |
| For example, a type-safe user-defined average can look like:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.</span><span class="o">{</span><span class="nc">Encoder</span><span class="o">,</span> <span class="nc">Encoders</span><span class="o">,</span> <span class="nc">SparkSession</span><span class="o">}</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql.expressions.Aggregator</span> |
| |
| <span class="k">case</span> <span class="k">class</span> <span class="nc">Employee</span><span class="o">(</span><span class="n">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">salary</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span> |
| <span class="k">case</span> <span class="k">class</span> <span class="nc">Average</span><span class="o">(</span><span class="k">var</span> <span class="n">sum</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="k">var</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span> |
| |
| <span class="k">object</span> <span class="nc">MyAverage</span> <span class="k">extends</span> <span class="nc">Aggregator</span><span class="o">[</span><span class="kt">Employee</span>, <span class="kt">Average</span>, <span class="kt">Double</span><span class="o">]</span> <span class="o">{</span> |
| <span class="c1">// A zero value for this aggregation. Should satisfy the property that any b + zero = b</span> |
| <span class="k">def</span> <span class="n">zero</span><span class="k">:</span> <span class="kt">Average</span> <span class="o">=</span> <span class="nc">Average</span><span class="o">(</span><span class="mi">0L</span><span class="o">,</span> <span class="mi">0L</span><span class="o">)</span> |
| <span class="c1">// Combine two values to produce a new value. For performance, the function may modify `buffer`</span> |
| <span class="c1">// and return it instead of constructing a new object</span> |
| <span class="k">def</span> <span class="n">reduce</span><span class="o">(</span><span class="n">buffer</span><span class="k">:</span> <span class="kt">Average</span><span class="o">,</span> <span class="n">employee</span><span class="k">:</span> <span class="kt">Employee</span><span class="o">)</span><span class="k">:</span> <span class="kt">Average</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="n">sum</span> <span class="o">+=</span> <span class="n">employee</span><span class="o">.</span><span class="n">salary</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="n">count</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="n">buffer</span> |
| <span class="o">}</span> |
| <span class="c1">// Merge two intermediate values</span> |
| <span class="k">def</span> <span class="n">merge</span><span class="o">(</span><span class="n">b1</span><span class="k">:</span> <span class="kt">Average</span><span class="o">,</span> <span class="n">b2</span><span class="k">:</span> <span class="kt">Average</span><span class="o">)</span><span class="k">:</span> <span class="kt">Average</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="n">b1</span><span class="o">.</span><span class="n">sum</span> <span class="o">+=</span> <span class="n">b2</span><span class="o">.</span><span class="n">sum</span> |
| <span class="n">b1</span><span class="o">.</span><span class="n">count</span> <span class="o">+=</span> <span class="n">b2</span><span class="o">.</span><span class="n">count</span> |
| <span class="n">b1</span> |
| <span class="o">}</span> |
| <span class="c1">// Transform the output of the reduction</span> |
| <span class="k">def</span> <span class="n">finish</span><span class="o">(</span><span class="n">reduction</span><span class="k">:</span> <span class="kt">Average</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="n">reduction</span><span class="o">.</span><span class="n">sum</span><span class="o">.</span><span class="n">toDouble</span> <span class="o">/</span> <span class="n">reduction</span><span class="o">.</span><span class="n">count</span> |
| <span class="c1">// Specifies the Encoder for the intermediate value type</span> |
| <span class="k">def</span> <span class="n">bufferEncoder</span><span class="k">:</span> <span class="kt">Encoder</span><span class="o">[</span><span class="kt">Average</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Encoders</span><span class="o">.</span><span class="n">product</span> |
| <span class="c1">// Specifies the Encoder for the final output value type</span> |
| <span class="k">def</span> <span class="n">outputEncoder</span><span class="k">:</span> <span class="kt">Encoder</span><span class="o">[</span><span class="kt">Double</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Encoders</span><span class="o">.</span><span class="n">scalaDouble</span> |
| <span class="o">}</span> |
| |
| <span class="k">val</span> <span class="n">ds</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/employees.json"</span><span class="o">).</span><span class="n">as</span><span class="o">[</span><span class="kt">Employee</span><span class="o">]</span> |
| <span class="n">ds</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// | name|salary|</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// |Michael| 3000|</span> |
| <span class="c1">// | Andy| 4500|</span> |
| <span class="c1">// | Justin| 3500|</span> |
| <span class="c1">// | Berta| 4000|</span> |
| <span class="c1">// +-------+------+</span> |
| |
| <span class="c1">// Convert the function to a `TypedColumn` and give it a name</span> |
| <span class="k">val</span> <span class="n">averageSalary</span> <span class="k">=</span> <span class="nc">MyAverage</span><span class="o">.</span><span class="n">toColumn</span><span class="o">.</span><span class="n">name</span><span class="o">(</span><span class="s">"average_salary"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">ds</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="n">averageSalary</span><span class="o">)</span> |
| <span class="n">result</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// |average_salary|</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// | 3750.0|</span> |
| <span class="c1">// +--------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.</small></div> |
| </div> |
| <div data-lang="java"> |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.io.Serializable</span><span class="o">;</span> |
| |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoder</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.TypedColumn</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.expressions.Aggregator</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Employee</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kt">long</span> <span class="n">salary</span><span class="o">;</span> |
| |
| <span class="c1">// Constructors, getters, setters...</span> |
| |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Average</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kt">long</span> <span class="n">sum</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kt">long</span> <span class="n">count</span><span class="o">;</span> |
| |
| <span class="c1">// Constructors, getters, setters...</span> |
| |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyAverage</span> <span class="kd">extends</span> <span class="n">Aggregator</span><span class="o"><</span><span class="n">Employee</span><span class="o">,</span> <span class="n">Average</span><span class="o">,</span> <span class="n">Double</span><span class="o">></span> <span class="o">{</span> |
| <span class="c1">// A zero value for this aggregation. Should satisfy the property that any b + zero = b</span> |
| <span class="kd">public</span> <span class="n">Average</span> <span class="nf">zero</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="k">new</span> <span class="n">Average</span><span class="o">(</span><span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="mi">0</span><span class="n">L</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="c1">// Combine two values to produce a new value. For performance, the function may modify `buffer`</span> |
| <span class="c1">// and return it instead of constructing a new object</span> |
| <span class="kd">public</span> <span class="n">Average</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Average</span> <span class="n">buffer</span><span class="o">,</span> <span class="n">Employee</span> <span class="n">employee</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kt">long</span> <span class="n">newSum</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getSum</span><span class="o">()</span> <span class="o">+</span> <span class="n">employee</span><span class="o">.</span><span class="na">getSalary</span><span class="o">();</span> |
| <span class="kt">long</span> <span class="n">newCount</span> <span class="o">=</span> <span class="n">buffer</span><span class="o">.</span><span class="na">getCount</span><span class="o">()</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="na">setSum</span><span class="o">(</span><span class="n">newSum</span><span class="o">);</span> |
| <span class="n">buffer</span><span class="o">.</span><span class="na">setCount</span><span class="o">(</span><span class="n">newCount</span><span class="o">);</span> |
| <span class="k">return</span> <span class="n">buffer</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="c1">// Merge two intermediate values</span> |
| <span class="kd">public</span> <span class="n">Average</span> <span class="nf">merge</span><span class="o">(</span><span class="n">Average</span> <span class="n">b1</span><span class="o">,</span> <span class="n">Average</span> <span class="n">b2</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kt">long</span> <span class="n">mergedSum</span> <span class="o">=</span> <span class="n">b1</span><span class="o">.</span><span class="na">getSum</span><span class="o">()</span> <span class="o">+</span> <span class="n">b2</span><span class="o">.</span><span class="na">getSum</span><span class="o">();</span> |
| <span class="kt">long</span> <span class="n">mergedCount</span> <span class="o">=</span> <span class="n">b1</span><span class="o">.</span><span class="na">getCount</span><span class="o">()</span> <span class="o">+</span> <span class="n">b2</span><span class="o">.</span><span class="na">getCount</span><span class="o">();</span> |
| <span class="n">b1</span><span class="o">.</span><span class="na">setSum</span><span class="o">(</span><span class="n">mergedSum</span><span class="o">);</span> |
| <span class="n">b1</span><span class="o">.</span><span class="na">setCount</span><span class="o">(</span><span class="n">mergedCount</span><span class="o">);</span> |
| <span class="k">return</span> <span class="n">b1</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="c1">// Transform the output of the reduction</span> |
| <span class="kd">public</span> <span class="n">Double</span> <span class="nf">finish</span><span class="o">(</span><span class="n">Average</span> <span class="n">reduction</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">reduction</span><span class="o">.</span><span class="na">getSum</span><span class="o">())</span> <span class="o">/</span> <span class="n">reduction</span><span class="o">.</span><span class="na">getCount</span><span class="o">();</span> |
| <span class="o">}</span> |
| <span class="c1">// Specifies the Encoder for the intermediate value type</span> |
| <span class="kd">public</span> <span class="n">Encoder</span><span class="o"><</span><span class="n">Average</span><span class="o">></span> <span class="nf">bufferEncoder</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">bean</span><span class="o">(</span><span class="n">Average</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="c1">// Specifies the Encoder for the final output value type</span> |
| <span class="kd">public</span> <span class="n">Encoder</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="nf">outputEncoder</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">DOUBLE</span><span class="o">();</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| |
| <span class="n">Encoder</span><span class="o"><</span><span class="n">Employee</span><span class="o">></span> <span class="n">employeeEncoder</span> <span class="o">=</span> <span class="n">Encoders</span><span class="o">.</span><span class="na">bean</span><span class="o">(</span><span class="n">Employee</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="n">String</span> <span class="n">path</span> <span class="o">=</span> <span class="s">"examples/src/main/resources/employees.json"</span><span class="o">;</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Employee</span><span class="o">></span> <span class="n">ds</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="n">path</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="n">employeeEncoder</span><span class="o">);</span> |
| <span class="n">ds</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// | name|salary|</span> |
| <span class="c1">// +-------+------+</span> |
| <span class="c1">// |Michael| 3000|</span> |
| <span class="c1">// | Andy| 4500|</span> |
| <span class="c1">// | Justin| 3500|</span> |
| <span class="c1">// | Berta| 4000|</span> |
| <span class="c1">// +-------+------+</span> |
| |
| <span class="n">MyAverage</span> <span class="n">myAverage</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyAverage</span><span class="o">();</span> |
| <span class="c1">// Convert the function to a `TypedColumn` and give it a name</span> |
| <span class="n">TypedColumn</span><span class="o"><</span><span class="n">Employee</span><span class="o">,</span> <span class="n">Double</span><span class="o">></span> <span class="n">averageSalary</span> <span class="o">=</span> <span class="n">myAverage</span><span class="o">.</span><span class="na">toColumn</span><span class="o">().</span><span class="na">name</span><span class="o">(</span><span class="s">"average_salary"</span><span class="o">);</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Double</span><span class="o">></span> <span class="n">result</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">averageSalary</span><span class="o">);</span> |
| <span class="n">result</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// |average_salary|</span> |
| <span class="c1">// +--------------+</span> |
| <span class="c1">// | 3750.0|</span> |
| <span class="c1">// +--------------+</span> |
| </pre></div> |
| <div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedTypedAggregation.java" in the Spark repo.</small></div> |
| </div> |
| </div> |
| |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-1.12.4.min.js"></script> |
| <script src="js/vendor/bootstrap.min.js"></script> |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |