| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <title>Spark SQL and DataFrames - Spark 1.3.0 Documentation</title> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <meta name="viewport" content="width=device-width"> |
| <link rel="stylesheet" href="css/bootstrap-responsive.min.css"> |
| <link rel="stylesheet" href="css/main.css"> |
| |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| |
| |
| |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <div class="navbar navbar-fixed-top" id="topbar"> |
| <div class="navbar-inner"> |
| <div class="container"> |
| <div class="brand"><a href="index.html"> |
| <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.3.0</span> |
| </div> |
| <ul class="nav"> |
| <!--TODO(andyk): Add class="active" attribute to li some how.--> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="quick-start.html">Quick Start</a></li> |
| <li><a href="programming-guide.html">Spark Programming Guide</a></li> |
| <li class="divider"></li> |
| <li><a href="streaming-programming-guide.html">Spark Streaming</a></li> |
| <li><a href="sql-programming-guide.html">DataFrames and SQL</a></li> |
| <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li> |
| <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> |
| <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li> |
| <li><a href="api/java/index.html">Java</a></li> |
| <li><a href="api/python/index.html">Python</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="cluster-overview.html">Overview</a></li> |
| <li><a href="submitting-applications.html">Submitting Applications</a></li> |
| <li class="divider"></li> |
| <li><a href="spark-standalone.html">Spark Standalone</a></li> |
| <li><a href="running-on-mesos.html">Mesos</a></li> |
| <li><a href="running-on-yarn.html">YARN</a></li> |
| <li class="divider"></li> |
| <li><a href="ec2-scripts.html">Amazon EC2</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="configuration.html">Configuration</a></li> |
| <li><a href="monitoring.html">Monitoring</a></li> |
| <li><a href="tuning.html">Tuning Guide</a></li> |
| <li><a href="job-scheduling.html">Job Scheduling</a></li> |
| <li><a href="security.html">Security</a></li> |
| <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> |
| <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li> |
| <li class="divider"></li> |
| <li><a href="building-spark.html">Building Spark</a></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects">Supplemental Projects</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!--<p class="navbar-text pull-right"><span class="version-text">v1.3.0</span></p>--> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container" id="content"> |
| |
| <h1 class="title">Spark SQL and DataFrame Guide</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#overview">Overview</a></li> |
| <li><a href="#dataframes">DataFrames</a> <ul> |
| <li><a href="#starting-point-sqlcontext">Starting Point: <code>SQLContext</code></a></li> |
| <li><a href="#creating-dataframes">Creating DataFrames</a></li> |
| <li><a href="#dataframe-operations">DataFrame Operations</a></li> |
| <li><a href="#running-sql-queries-programmatically">Running SQL Queries Programmatically</a></li> |
| <li><a href="#interoperating-with-rdds">Interoperating with RDDs</a> <ul> |
| <li><a href="#inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</a></li> |
| <li><a href="#programmatically-specifying-the-schema">Programmatically Specifying the Schema</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#data-sources">Data Sources</a> <ul> |
| <li><a href="#generic-loadsave-functions">Generic Load/Save Functions</a> <ul> |
| <li><a href="#manually-specifying-options">Manually Specifying Options</a></li> |
| <li><a href="#save-modes">Save Modes</a></li> |
| <li><a href="#saving-to-persistent-tables">Saving to Persistent Tables</a></li> |
| </ul> |
| </li> |
| <li><a href="#parquet-files">Parquet Files</a> <ul> |
| <li><a href="#loading-data-programmatically">Loading Data Programmatically</a></li> |
| <li><a href="#partition-discovery">Partition discovery</a></li> |
| <li><a href="#schema-merging">Schema merging</a></li> |
| <li><a href="#configuration">Configuration</a></li> |
| </ul> |
| </li> |
| <li><a href="#json-datasets">JSON Datasets</a></li> |
| <li><a href="#hive-tables">Hive Tables</a></li> |
| <li><a href="#jdbc-to-other-databases">JDBC To Other Databases</a></li> |
| <li><a href="#troubleshooting">Troubleshooting</a></li> |
| </ul> |
| </li> |
| <li><a href="#performance-tuning">Performance Tuning</a> <ul> |
| <li><a href="#caching-data-in-memory">Caching Data In Memory</a></li> |
| <li><a href="#other-configuration-options">Other Configuration Options</a></li> |
| </ul> |
| </li> |
| <li><a href="#distributed-sql-engine">Distributed SQL Engine</a> <ul> |
| <li><a href="#running-the-thrift-jdbcodbc-server">Running the Thrift JDBC/ODBC server</a></li> |
| <li><a href="#running-the-spark-sql-cli">Running the Spark SQL CLI</a></li> |
| </ul> |
| </li> |
| <li><a href="#migration-guide">Migration Guide</a> <ul> |
| <li><a href="#upgrading-from-spark-sql-10-12-to-13">Upgrading from Spark SQL 1.0-1.2 to 1.3</a> <ul> |
| <li><a href="#rename-of-schemardd-to-dataframe">Rename of SchemaRDD to DataFrame</a></li> |
| <li><a href="#unification-of-the-java-and-scala-apis">Unification of the Java and Scala APIs</a></li> |
| <li><a href="#isolation-of-implicit-conversions-and-removal-of-dsl-package-scala-only">Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)</a></li> |
| <li><a href="#removal-of-the-type-aliases-in-orgapachesparksql-for-datatype-scala-only">Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)</a></li> |
| <li><a href="#udf-registration-moved-to-sqlcontextudf-java--scala">UDF Registration Moved to <code>sqlContext.udf</code> (Java & Scala)</a></li> |
| <li><a href="#python-datatypes-no-longer-singletons">Python DataTypes No Longer Singletons</a></li> |
| </ul> |
| </li> |
| <li><a href="#migration-guide-for-shark-user">Migration Guide for Shark User</a> <ul> |
| <li><a href="#scheduling">Scheduling</a></li> |
| <li><a href="#reducer-number">Reducer number</a></li> |
| <li><a href="#caching">Caching</a></li> |
| </ul> |
| </li> |
| <li><a href="#compatibility-with-apache-hive">Compatibility with Apache Hive</a> <ul> |
| <li><a href="#deploying-in-existing-hive-warehouses">Deploying in Existing Hive Warehouses</a></li> |
| <li><a href="#supported-hive-features">Supported Hive Features</a></li> |
| <li><a href="#unsupported-hive-functionality">Unsupported Hive Functionality</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#data-types">Data Types</a></li> |
| </ul> |
| |
| <h1 id="overview">Overview</h1> |
| |
| <p>Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.</p> |
| |
| <h1 id="dataframes">DataFrames</h1> |
| |
| <p>A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.</p> |
| |
| <p>The DataFrame API is available in <a href="api/scala/index.html#org.apache.spark.sql.DataFrame">Scala</a>, <a href="api/java/index.html?org/apache/spark/sql/DataFrame.html">Java</a>, and <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">Python</a>.</p> |
| |
| <p>All of the examples on this page use sample data included in the Spark distribution and can be run in the <code>spark-shell</code> or the <code>pyspark</code> shell.</p> |
| |
| <h2 id="starting-point-sqlcontext">Starting Point: <code>SQLContext</code></h2> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>The entry point into all functionality in Spark SQL is the |
| <a href="api/scala/index.html#org.apache.spark.sql.`SQLContext`"><code>SQLContext</code></a> class, or one of its |
| descendants. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// this is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>The entry point into all functionality in Spark SQL is the |
| <a href="api/java/index.html#org.apache.spark.sql.SQLContext"><code>SQLContext</code></a> class, or one of its |
| descendants. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="o">...;</span> <span class="c1">// An existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>The entry point into all relational functionality in Spark is the |
| <a href="api/python/pyspark.sql.SQLContext-class.html"><code>SQLContext</code></a> class, or one |
| of its decedents. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>In addition to the basic <code>SQLContext</code>, you can also create a <code>HiveContext</code>, which provides a |
| superset of the functionality provided by the basic <code>SQLContext</code>. Additional features include |
| the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the |
| ability to read data from Hive tables. To use a <code>HiveContext</code>, you do not need to have an |
| existing Hive setup, and all of the data sources available to a <code>SQLContext</code> are still available. |
| <code>HiveContext</code> is only packaged separately to avoid including all of Hive’s dependencies in the default |
| Spark build. If these dependencies are not a problem for your application then using <code>HiveContext</code> |
| is recommended for the 1.3 release of Spark. Future releases will focus on bringing <code>SQLContext</code> up |
| to feature parity with a <code>HiveContext</code>.</p> |
| |
| <p>The specific variant of SQL that is used to parse queries can also be selected using the |
| <code>spark.sql.dialect</code> option. This parameter can be changed using either the <code>setConf</code> method on |
| a <code>SQLContext</code> or by using a <code>SET key=value</code> command in SQL. For a <code>SQLContext</code>, the only dialect |
| available is “sql” which uses a simple SQL parser provided by Spark SQL. In a <code>HiveContext</code>, the |
| default is “hiveql”, though “sql” is also available. Since the HiveQL parser is much more complete, |
| this is recommended for most use cases.</p> |
| |
| <h2 id="creating-dataframes">Creating DataFrames</h2> |
| |
| <p>With a <code>SQLContext</code>, applications can create <code>DataFrame</code>s from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>, from a Hive table, or from <a href="#data-sources">data sources</a>.</p> |
| |
| <p>As an example, the following creates a <code>DataFrame</code> based on the content of a JSON file:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">)</span> |
| |
| <span class="c1">// Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="o">...;</span> <span class="c1">// An existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">jsonFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| |
| <span class="c1">// Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonFile</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c"># Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="p">()</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="dataframe-operations">DataFrame Operations</h2> |
| |
| <p>DataFrames provide a domain-specific language for structured data manipulation in <a href="api/scala/index.html#org.apache.spark.sql.DataFrame">Scala</a>, <a href="api/java/index.html?org/apache/spark/sql/DataFrame.html">Java</a>, and <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">Python</a>.</p> |
| |
| <p>Here we include some basic examples of structured data processing using DataFrames:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// Create the DataFrame</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">)</span> |
| |
| <span class="c1">// Show the content of the DataFrame</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// null Michael</span> |
| <span class="c1">// 30 Andy</span> |
| <span class="c1">// 19 Justin</span> |
| |
| <span class="c1">// Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: long (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// name</span> |
| <span class="c1">// Michael</span> |
| <span class="c1">// Andy</span> |
| <span class="c1">// Justin</span> |
| |
| <span class="c1">// Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="n">df</span><span class="o">(</span><span class="s">"age"</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// name (age + 1)</span> |
| <span class="c1">// Michael null</span> |
| <span class="c1">// Andy 31</span> |
| <span class="c1">// Justin 20</span> |
| |
| <span class="c1">// Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">df</span><span class="o">(</span><span class="s">"name"</span><span class="o">)</span> <span class="o">></span> <span class="mi">21</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// 30 Andy</span> |
| |
| <span class="c1">// Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="n">count</span><span class="o">().</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// age count</span> |
| <span class="c1">// null 1</span> |
| <span class="c1">// 19 1</span> |
| <span class="c1">// 30 1</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">val</span> <span class="nl">sc:</span> <span class="n">JavaSparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="n">val</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// Create the DataFrame</span> |
| <span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">jsonFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| |
| <span class="c1">// Show the content of the DataFrame</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// null Michael</span> |
| <span class="c1">// 30 Andy</span> |
| <span class="c1">// 19 Justin</span> |
| |
| <span class="c1">// Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: long (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// name</span> |
| <span class="c1">// Michael</span> |
| <span class="c1">// Andy</span> |
| <span class="c1">// Justin</span> |
| |
| <span class="c1">// Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="n">df</span><span class="o">.</span><span class="na">col</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">plus</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// name (age + 1)</span> |
| <span class="c1">// Michael null</span> |
| <span class="c1">// Andy 31</span> |
| <span class="c1">// Justin 20</span> |
| |
| <span class="c1">// Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">df</span><span class="o">(</span><span class="s">"name"</span><span class="o">)</span> <span class="o">></span> <span class="mi">21</span><span class="o">).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// 30 Andy</span> |
| |
| <span class="c1">// Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">count</span><span class="o">().</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// age count</span> |
| <span class="c1">// null 1</span> |
| <span class="c1">// 19 1</span> |
| <span class="c1">// 30 1</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># Create the DataFrame</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonFile</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c"># Show the content of the DataFrame</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## age name</span> |
| <span class="c">## null Michael</span> |
| <span class="c">## 30 Andy</span> |
| <span class="c">## 19 Justin</span> |
| |
| <span class="c"># Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c">## root</span> |
| <span class="c">## |-- age: long (nullable = true)</span> |
| <span class="c">## |-- name: string (nullable = true)</span> |
| |
| <span class="c"># Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## name</span> |
| <span class="c">## Michael</span> |
| <span class="c">## Andy</span> |
| <span class="c">## Justin</span> |
| |
| <span class="c"># Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">,</span> <span class="n">df</span><span class="o">.</span><span class="n">age</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## name (age + 1)</span> |
| <span class="c">## Michael null</span> |
| <span class="c">## Andy 31</span> |
| <span class="c">## Justin 20</span> |
| |
| <span class="c"># Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">name</span> <span class="o">></span> <span class="mi">21</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## age name</span> |
| <span class="c">## 30 Andy</span> |
| |
| <span class="c"># Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="p">(</span><span class="s">"age"</span><span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## age count</span> |
| <span class="c">## null 1</span> |
| <span class="c">## 19 1</span> |
| <span class="c">## 30 1</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="running-sql-queries-programmatically">Running SQL Queries Programmatically</h2> |
| |
| <p>The <code>sql</code> function on a <code>SQLContext</code> enables applications to run SQL queries programmatically and returns the result as a <code>DataFrame</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// An existing SQLContext</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT * FROM table"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">val</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// An existing SQLContext</span> |
| <span class="n">val</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT * FROM table"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT * FROM table"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="interoperating-with-rdds">Interoperating with RDDs</h2> |
| |
| <p>Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first |
| method uses reflection to infer the schema of an RDD that contains specific types of objects. This |
| reflection based approach leads to more concise code and works well when you already know the schema |
| while writing your Spark application.</p> |
| |
| <p>The second method for creating DataFrames is through a programmatic interface that allows you to |
| construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows |
| you to construct DataFrames when the columns and their types are not known until runtime.</p> |
| |
| <h3 id="inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</h3> |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>The Scala interface for Spark SQL supports automatically converting an RDD containing case classes |
| to a DataFrame. The case class |
| defines the schema of the table. The names of the arguments to the case class are read using |
| reflection and become the names of the columns. Case classes can also be nested or contain complex |
| types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be |
| registered as a table. Tables can be used in subsequent SQL statements.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| <span class="c1">// this is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span> |
| |
| <span class="c1">// Define the schema using a case class.</span> |
| <span class="c1">// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,</span> |
| <span class="c1">// you can use custom classes that implement the Product interface.</span> |
| <span class="k">case</span> <span class="k">class</span> <span class="nc">Person</span><span class="o">(</span><span class="n">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">age</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> |
| |
| <span class="c1">// Create an RDD of Person objects and register it as a table.</span> |
| <span class="k">val</span> <span class="n">people</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)).</span><span class="n">map</span><span class="o">(</span><span class="n">p</span> <span class="k">=></span> <span class="nc">Person</span><span class="o">(</span><span class="n">p</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">p</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">.</span><span class="n">toInt</span><span class="o">)).</span><span class="n">toDF</span><span class="o">()</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="k">val</span> <span class="n">teenagers</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by ordinal.</span> |
| <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Spark SQL supports automatically converting an RDD of <a href="http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly">JavaBeans</a> |
| into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. |
| Currently, Spark SQL does not support JavaBeans that contain |
| nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a |
| class that implements Serializable and has getters and setters for all of its fields.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Person</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kt">int</span> <span class="n">age</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">getName</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">name</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setName</span><span class="o">(</span><span class="n">String</span> <span class="n">name</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">name</span> <span class="o">=</span> <span class="n">name</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">int</span> <span class="nf">getAge</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">age</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setAge</span><span class="o">(</span><span class="kt">int</span> <span class="n">age</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">age</span> <span class="o">=</span> <span class="n">age</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>A schema can be applied to an existing RDD by calling <code>createDataFrame</code> and providing the Class object |
| for the JavaBean.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="c1">// Load a text file and convert each line to a JavaBean.</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Person</span><span class="o">></span> <span class="n">people</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">).</span><span class="na">map</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Person</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">Person</span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">parts</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> |
| |
| <span class="n">Person</span> <span class="n">person</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Person</span><span class="o">();</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">0</span><span class="o">]);</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setAge</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">()));</span> |
| |
| <span class="k">return</span> <span class="n">person</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Apply a schema to an RDD of JavaBeans and register it as a table.</span> |
| <span class="n">DataFrame</span> <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">people</span><span class="o">,</span> <span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL can be run over RDDs that have been registered as tables.</span> |
| <span class="n">DataFrame</span> <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by ordinal.</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">teenagerNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">call</span><span class="o">(</span><span class="n">Row</span> <span class="n">row</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of |
| key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, |
| and the types are inferred by looking at the first row. Since we currently only look at the first |
| row, it is important that there is no missing data in the first row of the RDD. In future versions we |
| plan to more completely infer the schema by looking at more data, similar to the inference that is |
| performed on JSON files.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sc is an existing SparkContext.</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span><span class="p">,</span> <span class="n">Row</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># Load a text file and convert each line to a Row.</span> |
| <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="p">)</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">","</span><span class="p">))</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">age</span><span class="o">=</span><span class="nb">int</span><span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">])))</span> |
| |
| <span class="c"># Infer the schema, and register the DataFrame as a table.</span> |
| <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">inferSchema</span><span class="p">(</span><span class="n">people</span><span class="p">)</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c"># SQL can be run over DataFrames that have been registered as a table.</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| |
| <span class="c"># The results of SQL queries are RDDs and support all the normal RDD operations.</span> |
| <span class="n">teenNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">teenName</span> <span class="ow">in</span> <span class="n">teenNames</span><span class="o">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span> <span class="n">teenName</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="programmatically-specifying-the-schema">Programmatically Specifying the Schema</h3> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>When case classes cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed |
| and fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of <code>Row</code>s from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| <code>Row</code>s in the RDD created in Step 1.</li> |
| <li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided |
| by <code>SQLContext</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// Create an RDD</span> |
| <span class="k">val</span> <span class="n">people</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">)</span> |
| |
| <span class="c1">// The schema is encoded in a string</span> |
| <span class="k">val</span> <span class="n">schemaString</span> <span class="k">=</span> <span class="s">"name age"</span> |
| |
| <span class="c1">// Import Spark SQL data types and Row.</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql._</span> |
| |
| <span class="c1">// Generate the schema based on the string of schema</span> |
| <span class="k">val</span> <span class="n">schema</span> <span class="k">=</span> |
| <span class="nc">StructType</span><span class="o">(</span> |
| <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">fieldName</span> <span class="k">=></span> <span class="nc">StructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">,</span> <span class="kc">true</span><span class="o">)))</span> |
| |
| <span class="c1">// Convert records of the RDD (people) to Rows.</span> |
| <span class="k">val</span> <span class="n">rowRDD</span> <span class="k">=</span> <span class="n">people</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)).</span><span class="n">map</span><span class="o">(</span><span class="n">p</span> <span class="k">=></span> <span class="nc">Row</span><span class="o">(</span><span class="n">p</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">p</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">))</span> |
| |
| <span class="c1">// Apply the schema to the RDD.</span> |
| <span class="k">val</span> <span class="n">peopleDataFrame</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">)</span> |
| |
| <span class="c1">// Register the DataFrames as a table.</span> |
| <span class="n">peopleDataFrame</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="k">val</span> <span class="n">results</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by ordinal.</span> |
| <span class="n">results</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>When JavaBean classes cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed and |
| fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of <code>Row</code>s from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| <code>Row</code>s in the RDD created in Step 1.</li> |
| <li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided |
| by <code>SQLContext</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Import factory methods provided by DataType.</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataType</span><span class="o">;</span> |
| <span class="c1">// Import StructType and StructField</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructType</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructField</span><span class="o">;</span> |
| <span class="c1">// Import Row.</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| |
| <span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="c1">// Load a text file and convert each line to a JavaBean.</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">people</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">);</span> |
| |
| <span class="c1">// The schema is encoded in a string</span> |
| <span class="n">String</span> <span class="n">schemaString</span> <span class="o">=</span> <span class="s">"name age"</span><span class="o">;</span> |
| |
| <span class="c1">// Generate the schema based on the string of schema</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">StructField</span><span class="o">></span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">StructField</span><span class="o">>();</span> |
| <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="nl">fieldName:</span> <span class="n">schemaString</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> <span class="o">{</span> |
| <span class="n">fields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataType</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="n">DataType</span><span class="o">.</span><span class="na">StringType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="n">StructType</span> <span class="n">schema</span> <span class="o">=</span> <span class="n">DataType</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">fields</span><span class="o">);</span> |
| |
| <span class="c1">// Convert records of the RDD (people) to Rows.</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">rowRDD</span> <span class="o">=</span> <span class="n">people</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Row</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">Row</span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">record</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">fields</span> <span class="o">=</span> <span class="n">record</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> |
| <span class="k">return</span> <span class="n">Row</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">fields</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">fields</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">());</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Apply the schema to the RDD.</span> |
| <span class="n">DataFrame</span> <span class="n">peopleDataFrame</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">);</span> |
| |
| <span class="c1">// Register the DataFrame as a table.</span> |
| <span class="n">peopleDataFrame</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL can be run over RDDs that have been registered as tables.</span> |
| <span class="n">DataFrame</span> <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people"</span><span class="o">);</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by ordinal.</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">names</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">call</span><span class="o">(</span><span class="n">Row</span> <span class="n">row</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>When a dictionary of kwargs cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed and |
| fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of tuples or lists from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| tuples or lists in the RDD created in the step 1.</li> |
| <li>Apply the schema to the RDD via <code>createDataFrame</code> method provided by <code>SQLContext</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># Import SQLContext and data types</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="o">*</span> |
| |
| <span class="c"># sc is an existing SparkContext.</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># Load a text file and convert each line to a tuple.</span> |
| <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="p">)</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">","</span><span class="p">))</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">strip</span><span class="p">()))</span> |
| |
| <span class="c"># The schema is encoded in a string.</span> |
| <span class="n">schemaString</span> <span class="o">=</span> <span class="s">"name age"</span> |
| |
| <span class="n">fields</span> <span class="o">=</span> <span class="p">[</span><span class="n">StructField</span><span class="p">(</span><span class="n">field_name</span><span class="p">,</span> <span class="n">StringType</span><span class="p">(),</span> <span class="bp">True</span><span class="p">)</span> <span class="k">for</span> <span class="n">field_name</span> <span class="ow">in</span> <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="p">()]</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span><span class="n">fields</span><span class="p">)</span> |
| |
| <span class="c"># Apply the schema to the RDD.</span> |
| <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">,</span> <span class="n">schema</span><span class="p">)</span> |
| |
| <span class="c"># Register the DataFrame as a table.</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c"># SQL can be run over DataFrames that have been registered as a table.</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM people"</span><span class="p">)</span> |
| |
| <span class="c"># The results of SQL queries are RDDs and support all the normal RDD operations.</span> |
| <span class="n">names</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">names</span><span class="o">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span> <span class="n">name</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h1 id="data-sources">Data Sources</h1> |
| |
| <p>Spark SQL supports operating on a variety of data sources through the <code>DataFrame</code> interface. |
| A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. |
| Registering a DataFrame as a table allows you to run SQL queries over its data. This section |
| describes the general methods for loading and saving data using the Spark Data Sources and then |
| goes into specific options that are available for the built-in data sources.</p> |
| |
| <h2 id="generic-loadsave-functions">Generic Load/Save Functions</h2> |
| |
| <p>In the simplest form, the default data source (<code>parquet</code> unless otherwise configured by |
| <code>spark.sql.sources.default</code>) will be used for all operations.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">load</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">).</span><span class="n">save</span><span class="o">(</span><span class="s">"namesAndAges.parquet"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">load</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">);</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">).</span><span class="na">save</span><span class="o">(</span><span class="s">"namesAndAges.parquet"</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s">"people.parquet"</span><span class="p">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">)</span><span class="o">.</span><span class="n">save</span><span class="p">(</span><span class="s">"namesAndAges.parquet"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h3 id="manually-specifying-options">Manually Specifying Options</h3> |
| |
| <p>You can also manually specify the data source that will be used along with any extra options |
| that you would like to pass to the data source. Data sources are specified by their fully qualified |
| name (i.e., <code>org.apache.spark.sql.parquet</code>), but for built-in sources you can also use the shorted |
| name (<code>json</code>, <code>parquet</code>, <code>jdbc</code>). DataFrames of any type can be converted into other types |
| using this syntax.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">load</span><span class="o">(</span><span class="s">"people.json"</span><span class="o">,</span> <span class="s">"json"</span><span class="o">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">).</span><span class="n">save</span><span class="o">(</span><span class="s">"namesAndAges.parquet"</span><span class="o">,</span> <span class="s">"parquet"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">load</span><span class="o">(</span><span class="s">"people.json"</span><span class="o">,</span> <span class="s">"json"</span><span class="o">);</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">).</span><span class="na">save</span><span class="o">(</span><span class="s">"namesAndAges.parquet"</span><span class="o">,</span> <span class="s">"parquet"</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s">"people.json"</span><span class="p">,</span> <span class="s">"json"</span><span class="p">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">)</span><span class="o">.</span><span class="n">save</span><span class="p">(</span><span class="s">"namesAndAges.parquet"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h3 id="save-modes">Save Modes</h3> |
| |
| <p>Save operations can optionally take a <code>SaveMode</code>, that specifies how to handle existing data if |
| present. It is important to realize that these save modes do not utilize any locking and are not |
| atomic. Thus, it is not safe to have multiple writers attempting to write to the same location. |
| Additionally, when performing a <code>Overwrite</code>, the data will be deleted before writing out the |
| new data.</p> |
| |
| <table class="table"> |
| <tr><th>Scala/Java</th><th>Python</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>SaveMode.ErrorIfExists</code> (default)</td> |
| <td><code>"error"</code> (default)</td> |
| <td> |
| When saving a DataFrame to a data source, if data already exists, |
| an exception is expected to be thrown. |
| </td> |
| </tr> |
| <tr> |
| <td><code>SaveMode.Append</code></td> |
| <td><code>"append"</code></td> |
| <td> |
| When saving a DataFrame to a data source, if data/table already exists, |
| contents of the DataFrame are expected to be appended to existing data. |
| </td> |
| </tr> |
| <tr> |
| <td><code>SaveMode.Overwrite</code></td> |
| <td><code>"overwrite"</code></td> |
| <td> |
| Overwrite mode means that when saving a DataFrame to a data source, |
| if data/table already exists, existing data is expected to be overwritten by the contents of |
| the DataFrame. |
| </td> |
| </tr> |
| <tr> |
| <td><code>SaveMode.Ignore</code></td> |
| <td><code>"ignore"</code></td> |
| <td> |
| Ignore mode means that when saving a DataFrame to a data source, if data already exists, |
| the save operation is expected to not save the contents of the DataFrame and to not |
| change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. |
| </td> |
| </tr> |
| </table> |
| |
| <h3 id="saving-to-persistent-tables">Saving to Persistent Tables</h3> |
| |
| <p>When working with a <code>HiveContext</code>, <code>DataFrames</code> can also be saved as persistent tables using the |
| <code>saveAsTable</code> command. Unlike the <code>registerTempTable</code> command, <code>saveAsTable</code> will materialize the |
| contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables |
| will still exist even after your Spark program has restarted, as long as you maintain your connection |
| to the same metastore. A DataFrame for a persistent table can be created by calling the <code>table</code> |
| method on a <code>SQLContext</code> with the name of the table.</p> |
| |
| <p>By default <code>saveAsTable</code> will create a “managed table”, meaning that the location of the data will |
| be controlled by the metastore. Managed tables will also have their data deleted automatically |
| when a table is dropped.</p> |
| |
| <h2 id="parquet-files">Parquet Files</h2> |
| |
| <p><a href="http://parquet.io">Parquet</a> is a columnar format that is supported by many other data processing systems. |
| Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema |
| of the original data.</p> |
| |
| <h3 id="loading-data-programmatically">Loading Data Programmatically</h3> |
| |
| <p>Using the data from the above example:</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sqlContext from the previous example is used in this example.</span> |
| <span class="c1">// This is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span> |
| |
| <span class="k">val</span> <span class="n">people</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Person</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// An RDD of case class objects, from the previous example.</span> |
| |
| <span class="c1">// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">saveAsParquetFile</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">)</span> |
| |
| <span class="c1">// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c1">// The result of loading a Parquet file is also a DataFrame.</span> |
| <span class="k">val</span> <span class="n">parquetFile</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">parquetFile</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">)</span> |
| |
| <span class="c1">//Parquet files can also be registered as tables and then used in SQL statements.</span> |
| <span class="n">parquetFile</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"parquetFile"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">teenagers</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sqlContext from the previous example is used in this example.</span> |
| |
| <span class="n">DataFrame</span> <span class="n">schemaPeople</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// The DataFrame from the previous example.</span> |
| |
| <span class="c1">// DataFrames can be saved as Parquet files, maintaining the schema information.</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="na">saveAsParquetFile</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">);</span> |
| |
| <span class="c1">// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c1">// The result of loading a parquet file is also a DataFrame.</span> |
| <span class="n">DataFrame</span> <span class="n">parquetFile</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">parquetFile</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">);</span> |
| |
| <span class="c1">//Parquet files can also be registered as tables and then used in SQL statements.</span> |
| <span class="n">parquetFile</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"parquetFile"</span><span class="o">);</span> |
| <span class="n">DataFrame</span> <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="o">);</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">teenagerNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">call</span><span class="o">(</span><span class="n">Row</span> <span class="n">row</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sqlContext from the previous example is used in this example.</span> |
| |
| <span class="n">schemaPeople</span> <span class="c"># The DataFrame from the previous example.</span> |
| |
| <span class="c"># DataFrames can be saved as Parquet files, maintaining the schema information.</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">saveAsParquetFile</span><span class="p">(</span><span class="s">"people.parquet"</span><span class="p">)</span> |
| |
| <span class="c"># Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c"># The result of loading a parquet file is also a DataFrame.</span> |
| <span class="n">parquetFile</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">parquetFile</span><span class="p">(</span><span class="s">"people.parquet"</span><span class="p">)</span> |
| |
| <span class="c"># Parquet files can also be registered as tables and then used in SQL statements.</span> |
| <span class="n">parquetFile</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"parquetFile"</span><span class="p">);</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| <span class="n">teenNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">teenName</span> <span class="ow">in</span> <span class="n">teenNames</span><span class="o">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span> <span class="n">teenName</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">TABLE</span> <span class="n">parquetTable</span> |
| <span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">parquet</span> |
| <span class="k">OPTIONS</span> <span class="p">(</span> |
| <span class="n">path</span> <span class="ss">"examples/src/main/resources/people.parquet"</span> |
| <span class="p">)</span> |
| |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">parquetTable</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="partition-discovery">Partition discovery</h3> |
| |
| <p>Table partitioning is a common optimization approach used in systems like Hive. In a partitioned |
| table, data are usually stored in different directories, with partitioning column values encoded in |
| the path of each partition directory. The Parquet data source is now able to discover and infer |
| partitioning information automatically. For exmaple, we can store all our previously used |
| population data into a partitioned table using the following directory structure, with two extra |
| columns, <code>gender</code> and <code>country</code> as partitioning columns:</p> |
| |
| <div class="highlight"><pre><code class="language-text" data-lang="text">path |
| └── to |
| └── table |
| ├── gender=male |
| │  ├── ... |
| │  │ |
| │  ├── country=US |
| │  │  └── data.parquet |
| │  ├── country=CN |
| │  │  └── data.parquet |
| │  └── ... |
| └── gender=female |
|   ├── ... |
|   │ |
|   ├── country=US |
|   │  └── data.parquet |
|   ├── country=CN |
|   │  └── data.parquet |
|   └── ...</code></pre></div> |
| |
| <p>By passing <code>path/to/table</code> to either <code>SQLContext.parquetFile</code> or <code>SQLContext.load</code>, Spark SQL will |
| automatically extract the partitioning information from the paths. Now the schema of the returned |
| DataFrame becomes:</p> |
| |
| <div class="highlight"><pre><code class="language-text" data-lang="text">root |
| |-- name: string (nullable = true) |
| |-- age: long (nullable = true) |
| |-- gender: string (nullable = true) |
| |-- country: string (nullable = true)</code></pre></div> |
| |
| <p>Notice that the data types of the partitioning columns are automatically inferred. Currently, |
| numeric data types and string type are supported.</p> |
| |
| <h3 id="schema-merging">Schema merging</h3> |
| |
| <p>Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with |
| a simple schema, and gradually add more columns to the schema as needed. In this way, users may end |
| up with multiple Parquet files with different but mutually compatible schemas. The Parquet data |
| source is now able to automatically detect this case and merge schemas of all these files.</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sqlContext from the previous example is used in this example.</span> |
| <span class="c1">// This is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span> |
| |
| <span class="c1">// Create a simple DataFrame, stored into a partition directory</span> |
| <span class="k">val</span> <span class="n">df1</span> <span class="k">=</span> <span class="n">sparkContext</span><span class="o">.</span><span class="n">makeRDD</span><span class="o">(</span><span class="mi">1</span> <span class="n">to</span> <span class="mi">5</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">i</span> <span class="k">=></span> <span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">i</span> <span class="o">*</span> <span class="mi">2</span><span class="o">)).</span><span class="n">toDF</span><span class="o">(</span><span class="s">"single"</span><span class="o">,</span> <span class="s">"double"</span><span class="o">)</span> |
| <span class="n">df1</span><span class="o">.</span><span class="n">saveAsParquetFile</span><span class="o">(</span><span class="s">"data/test_table/key=1"</span><span class="o">)</span> |
| |
| <span class="c1">// Create another DataFrame in a new partition directory,</span> |
| <span class="c1">// adding a new column and dropping an existing column</span> |
| <span class="k">val</span> <span class="n">df2</span> <span class="k">=</span> <span class="n">sparkContext</span><span class="o">.</span><span class="n">makeRDD</span><span class="o">(</span><span class="mi">6</span> <span class="n">to</span> <span class="mi">10</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">i</span> <span class="k">=></span> <span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">i</span> <span class="o">*</span> <span class="mi">3</span><span class="o">)).</span><span class="n">toDF</span><span class="o">(</span><span class="s">"single"</span><span class="o">,</span> <span class="s">"triple"</span><span class="o">)</span> |
| <span class="n">df2</span><span class="o">.</span><span class="n">saveAsParquetFile</span><span class="o">(</span><span class="s">"data/test_table/key=2"</span><span class="o">)</span> |
| |
| <span class="c1">// Read the partitioned table</span> |
| <span class="k">val</span> <span class="n">df3</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">parquetFile</span><span class="o">(</span><span class="s">"data/test_table"</span><span class="o">)</span> |
| <span class="n">df3</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| |
| <span class="c1">// The final schema consists of all 3 columns in the Parquet files together</span> |
| <span class="c1">// with the partiioning column appeared in the partition directory paths.</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- single: int (nullable = true)</span> |
| <span class="c1">// |-- double: int (nullable = true)</span> |
| <span class="c1">// |-- triple: int (nullable = true)</span> |
| <span class="c1">// |-- key : int (nullable = true)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sqlContext from the previous example is used in this example.</span> |
| |
| <span class="c"># Create a simple DataFrame, stored into a partition directory</span> |
| <span class="n">df1</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">6</span><span class="p">))</span>\ |
| <span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">single</span><span class="o">=</span><span class="n">i</span><span class="p">,</span> <span class="n">double</span><span class="o">=</span><span class="n">i</span> <span class="o">*</span> <span class="mi">2</span><span class="p">)))</span> |
| <span class="n">df1</span><span class="o">.</span><span class="n">save</span><span class="p">(</span><span class="s">"data/test_table/key=1"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">)</span> |
| |
| <span class="c"># Create another DataFrame in a new partition directory,</span> |
| <span class="c"># adding a new column and dropping an existing column</span> |
| <span class="n">df2</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">6</span><span class="p">,</span> <span class="mi">11</span><span class="p">))</span> |
| <span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">single</span><span class="o">=</span><span class="n">i</span><span class="p">,</span> <span class="n">triple</span><span class="o">=</span><span class="n">i</span> <span class="o">*</span> <span class="mi">3</span><span class="p">)))</span> |
| <span class="n">df2</span><span class="o">.</span><span class="n">save</span><span class="p">(</span><span class="s">"data/test_table/key=2"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">)</span> |
| |
| <span class="c"># Read the partitioned table</span> |
| <span class="n">df3</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">parquetFile</span><span class="p">(</span><span class="s">"data/test_table"</span><span class="p">)</span> |
| <span class="n">df3</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| |
| <span class="c"># The final schema consists of all 3 columns in the Parquet files together</span> |
| <span class="c"># with the partiioning column appeared in the partition directory paths.</span> |
| <span class="c"># root</span> |
| <span class="c"># |-- single: int (nullable = true)</span> |
| <span class="c"># |-- double: int (nullable = true)</span> |
| <span class="c"># |-- triple: int (nullable = true)</span> |
| <span class="c"># |-- key : int (nullable = true)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="configuration">Configuration</h3> |
| |
| <p>Configuration of Parquet can be done using the <code>setConf</code> method on <code>SQLContext</code> or by running |
| <code>SET key=value</code> commands using SQL.</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.parquet.binaryAsString</code></td> |
| <td>false</td> |
| <td> |
| Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do |
| not differentiate between binary data and strings when writing out the Parquet schema. This |
| flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.int96AsTimestamp</code></td> |
| <td>true</td> |
| <td> |
| Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also |
| store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This |
| flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.cacheMetadata</code></td> |
| <td>true</td> |
| <td> |
| Turns on caching of Parquet schema metadata. Can speed up querying of static data. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.compression.codec</code></td> |
| <td>gzip</td> |
| <td> |
| Sets the compression codec use when writing Parquet files. Acceptable values include: |
| uncompressed, snappy, gzip, lzo. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.filterPushdown</code></td> |
| <td>false</td> |
| <td> |
| Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known |
| bug in Paruet 1.6.0rc3 (<a href="https://issues.apache.org/jira/browse/PARQUET-136">PARQUET-136</a>). |
| However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn |
| this feature on. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.hive.convertMetastoreParquet</code></td> |
| <td>true</td> |
| <td> |
| When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in |
| support. |
| </td> |
| </tr> |
| </table> |
| |
| <h2 id="json-datasets">JSON Datasets</h2> |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. |
| This conversion can be done using one of two methods in a <code>SQLContext</code>:</p> |
| |
| <ul> |
| <li><code>jsonFile</code> - loads data from a directory of JSON files where each line of the files is a JSON object.</li> |
| <li><code>jsonRDD</code> - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.</li> |
| </ul> |
| |
| <p>Note that the file that is offered as <em>jsonFile</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// A JSON dataset is pointed to by path.</span> |
| <span class="c1">// The path can be either a single text file or a directory storing text files.</span> |
| <span class="k">val</span> <span class="n">path</span> <span class="k">=</span> <span class="s">"examples/src/main/resources/people.json"</span> |
| <span class="c1">// Create a DataFrame from the file(s) pointed to by path</span> |
| <span class="k">val</span> <span class="n">people</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonFile</span><span class="o">(</span><span class="n">path</span><span class="o">)</span> |
| |
| <span class="c1">// The inferred schema can be visualized using the printSchema() method.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: integer (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Register this DataFrame as a table.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="k">val</span> <span class="n">teenagers</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| |
| <span class="c1">// Alternatively, a DataFrame can be created for a JSON dataset represented by</span> |
| <span class="c1">// an RDD[String] storing one JSON object per string.</span> |
| <span class="k">val</span> <span class="n">anotherPeopleRDD</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span> |
| <span class="s">"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""</span> <span class="o">::</span> <span class="nc">Nil</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">anotherPeople</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonRDD</span><span class="o">(</span><span class="n">anotherPeopleRDD</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. |
| This conversion can be done using one of two methods in a <code>SQLContext</code> :</p> |
| |
| <ul> |
| <li><code>jsonFile</code> - loads data from a directory of JSON files where each line of the files is a JSON object.</li> |
| <li><code>jsonRDD</code> - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.</li> |
| </ul> |
| |
| <p>Note that the file that is offered as <em>jsonFile</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="c1">// A JSON dataset is pointed to by path.</span> |
| <span class="c1">// The path can be either a single text file or a directory storing text files.</span> |
| <span class="n">String</span> <span class="n">path</span> <span class="o">=</span> <span class="s">"examples/src/main/resources/people.json"</span><span class="o">;</span> |
| <span class="c1">// Create a DataFrame from the file(s) pointed to by path</span> |
| <span class="n">DataFrame</span> <span class="n">people</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">jsonFile</span><span class="o">(</span><span class="n">path</span><span class="o">);</span> |
| |
| <span class="c1">// The inferred schema can be visualized using the printSchema() method.</span> |
| <span class="n">people</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: integer (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Register this DataFrame as a table.</span> |
| <span class="n">people</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="n">DataFrame</span> <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">);</span> |
| |
| <span class="c1">// Alternatively, a DataFrame can be created for a JSON dataset represented by</span> |
| <span class="c1">// an RDD[String] storing one JSON object per string.</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">jsonData</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> |
| <span class="s">"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"</span><span class="o">);</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">anotherPeopleRDD</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="n">jsonData</span><span class="o">);</span> |
| <span class="n">DataFrame</span> <span class="n">anotherPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">jsonRDD</span><span class="o">(</span><span class="n">anotherPeopleRDD</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. |
| This conversion can be done using one of two methods in a <code>SQLContext</code>:</p> |
| |
| <ul> |
| <li><code>jsonFile</code> - loads data from a directory of JSON files where each line of the files is a JSON object.</li> |
| <li><code>jsonRDD</code> - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.</li> |
| </ul> |
| |
| <p>Note that the file that is offered as <em>jsonFile</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sc is an existing SparkContext.</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># A JSON dataset is pointed to by path.</span> |
| <span class="c"># The path can be either a single text file or a directory storing text files.</span> |
| <span class="n">path</span> <span class="o">=</span> <span class="s">"examples/src/main/resources/people.json"</span> |
| <span class="c"># Create a DataFrame from the file(s) pointed to by path</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonFile</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> |
| |
| <span class="c"># The inferred schema can be visualized using the printSchema() method.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c"># root</span> |
| <span class="c"># |-- age: integer (nullable = true)</span> |
| <span class="c"># |-- name: string (nullable = true)</span> |
| |
| <span class="c"># Register this DataFrame as a table.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c"># SQL statements can be run by using the sql methods provided by `sqlContext`.</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| |
| <span class="c"># Alternatively, a DataFrame can be created for a JSON dataset represented by</span> |
| <span class="c"># an RDD[String] storing one JSON object per string.</span> |
| <span class="n">anotherPeopleRDD</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">([</span> |
| <span class="s">'{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'</span><span class="p">])</span> |
| <span class="n">anotherPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonRDD</span><span class="p">(</span><span class="n">anotherPeopleRDD</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">TABLE</span> <span class="n">jsonTable</span> |
| <span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">json</span> |
| <span class="k">OPTIONS</span> <span class="p">(</span> |
| <span class="n">path</span> <span class="ss">"examples/src/main/resources/people.json"</span> |
| <span class="p">)</span> |
| |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">jsonTable</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="hive-tables">Hive Tables</h2> |
| |
| <p>Spark SQL also supports reading and writing data stored in <a href="http://hive.apache.org/">Apache Hive</a>. |
| However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. |
| Hive support is enabled by adding the <code>-Phive</code> and <code>-Phive-thriftserver</code> flags to Spark’s build. |
| This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present |
| on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries |
| (SerDes) in order to access data stored in Hive.</p> |
| |
| <p>Configuration of Hive is done by placing your <code>hive-site.xml</code> file in <code>conf/</code>.</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL. Users who do |
| not have an existing Hive deployment can still create a <code>HiveContext</code>. When not configured by the |
| hive-site.xml, the context automatically creates <code>metastore_db</code> and <code>warehouse</code> in the current |
| directory.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">hive</span><span class="o">.</span><span class="nc">HiveContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="o">)</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="o">)</span> |
| |
| <span class="c1">// Queries are expressed in HiveQL</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"FROM src SELECT key, value"</span><span class="o">).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL. In addition to |
| the <code>sql</code> method a <code>HiveContext</code> also provides an <code>hql</code> methods, which allows queries to be |
| expressed in HiveQL.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">HiveContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">hive</span><span class="o">.</span><span class="na">HiveContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="o">);</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="o">);</span> |
| |
| <span class="c1">// Queries are expressed in HiveQL.</span> |
| <span class="n">Row</span><span class="o">[]</span> <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"FROM src SELECT key, value"</span><span class="o">).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL. In addition to |
| the <code>sql</code> method a <code>HiveContext</code> also provides an <code>hql</code> methods, which allows queries to be |
| expressed in HiveQL.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sc is an existing SparkContext.</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">HiveContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">HiveContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="p">)</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="p">)</span> |
| |
| <span class="c"># Queries can be expressed in HiveQL.</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"FROM src SELECT key, value"</span><span class="p">)</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="jdbc-to-other-databases">JDBC To Other Databases</h2> |
| |
| <p>Spark SQL also includes a data source that can read data from other databases using JDBC. This |
| functionality should be preferred over using <a href="api/scala/index.html#org.apache.spark.rdd.JdbcRDD">JdbcRDD</a>. |
| This is because the results are returned |
| as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. |
| The JDBC data source is also easier to use from Java or Python as it does not require the user to |
| provide a ClassTag. |
| (Note that this is different than the Spark SQL JDBC server, which allows other applications to |
| run queries using Spark SQL).</p> |
| |
| <p>To get started you will need to include the JDBC driver for you particular database on the |
| spark classpath. For example, to connect to postgres from the Spark Shell you would run the |
| following command:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">SPARK_CLASSPATH</span><span class="o">=</span>postgresql-9.3-1102-jdbc41.jar bin/spark-shell</code></pre></div> |
| |
| <p>Tables from the remote database can be loaded as a DataFrame or Spark SQL Temporary table using |
| the Data Sources API. The following options are supported:</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>url</code></td> |
| <td> |
| The JDBC URL to connect to. |
| </td> |
| </tr> |
| <tr> |
| <td><code>dbtable</code></td> |
| <td> |
| The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of |
| a SQL query can be used. For example, instead of a full table you could also use a |
| subquery in parentheses. |
| </td> |
| </tr> |
| |
| <tr> |
| <td><code>driver</code></td> |
| <td> |
| The class name of the JDBC driver needed to connect to this URL. This class with be loaded |
| on the master and workers before running an JDBC commands to allow the driver to |
| register itself with the JDBC subsystem. |
| </td> |
| </tr> |
| <tr> |
| <td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td> |
| <td> |
| These options must all be specified if any of them is specified. They describe how to |
| partition the table when reading in parallel from multiple workers. |
| <code>partitionColumn</code> must be a numeric column from the table in question. |
| </td> |
| </tr> |
| </table> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">jdbcDF</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">load</span><span class="o">(</span><span class="s">"jdbc"</span><span class="o">,</span> <span class="nc">Map</span><span class="o">(</span> |
| <span class="s">"url"</span> <span class="o">-></span> <span class="s">"jdbc:postgresql:dbserver"</span><span class="o">,</span> |
| <span class="s">"dbtable"</span> <span class="o">-></span> <span class="s">"schema.tablename"</span><span class="o">))</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>();</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"url"</span><span class="o">,</span> <span class="s">"jdbc:postgresql:dbserver"</span><span class="o">);</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"dbtable"</span><span class="o">,</span> <span class="s">"schema.tablename"</span><span class="o">);</span> |
| |
| <span class="n">DataFrame</span> <span class="n">jdbcDF</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">load</span><span class="o">(</span><span class="s">"jdbc"</span><span class="o">,</span> <span class="n">options</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s">"jdbc"</span><span class="p">,</span> <span class="n">url</span><span class="o">=</span><span class="s">"jdbc:postgresql:dbserver"</span><span class="p">,</span> <span class="n">dbtable</span><span class="o">=</span><span class="s">"schema.tablename"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">TABLE</span> <span class="n">jdbcTable</span> |
| <span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">jdbc</span> |
| <span class="k">OPTIONS</span> <span class="p">(</span> |
| <span class="n">url</span> <span class="ss">"jdbc:postgresql:dbserver"</span><span class="p">,</span> |
| <span class="n">dbtable</span> <span class="ss">"schema.tablename"</span> |
| <span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="troubleshooting">Troubleshooting</h2> |
| |
| <ul> |
| <li>The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.</li> |
| <li>Some databases, such as H2, convert all names to upper case. You’ll need to use upper case to refer to those names in Spark SQL.</li> |
| </ul> |
| |
| <h1 id="performance-tuning">Performance Tuning</h1> |
| |
| <p>For some workloads it is possible to improve performance by either caching data in memory, or by |
| turning on some experimental options.</p> |
| |
| <h2 id="caching-data-in-memory">Caching Data In Memory</h2> |
| |
| <p>Spark SQL can cache tables using an in-memory columnar format by calling <code>sqlContext.cacheTable("tableName")</code> or <code>dataFrame.cache()</code>. |
| Then Spark SQL will scan only required columns and will automatically tune compression to minimize |
| memory usage and GC pressure. You can call <code>sqlContext.uncacheTable("tableName")</code> to remove the table from memory.</p> |
| |
| <p>Configuration of in-memory caching can be done using the <code>setConf</code> method on <code>SQLContext</code> or by running |
| <code>SET key=value</code> commands using SQL.</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td> |
| <td>true</td> |
| <td> |
| When set to true Spark SQL will automatically select a compression codec for each column based |
| on statistics of the data. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td> |
| <td>10000</td> |
| <td> |
| Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization |
| and compression, but risk OOMs when caching data. |
| </td> |
| </tr> |
| |
| </table> |
| |
| <h2 id="other-configuration-options">Other Configuration Options</h2> |
| |
| <p>The following options can also be used to tune the performance of query execution. It is possible |
| that these options will be deprecated in future release as more optimizations are performed automatically.</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.autoBroadcastJoinThreshold</code></td> |
| <td>10485760 (10 MB)</td> |
| <td> |
| Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when |
| performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently |
| statistics are only supported for Hive Metastore tables where the command |
| `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.codegen</code></td> |
| <td>false</td> |
| <td> |
| When true, code will be dynamically generated at runtime for expression evaluation in a specific |
| query. For some queries with complicated expression this option can lead to significant speed-ups. |
| However, for simple queries this can actually slow down query execution. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.shuffle.partitions</code></td> |
| <td>200</td> |
| <td> |
| Configures the number of partitions to use when shuffling data for joins or aggregations. |
| </td> |
| </tr> |
| </table> |
| |
| <h1 id="distributed-sql-engine">Distributed SQL Engine</h1> |
| |
| <p>Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, without the need to write any code.</p> |
| |
| <h2 id="running-the-thrift-jdbcodbc-server">Running the Thrift JDBC/ODBC server</h2> |
| |
| <p>The Thrift JDBC/ODBC server implemented here corresponds to the <a href="https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2"><code>HiveServer2</code></a> |
| in Hive 0.13. You can test the JDBC server with the beeline script that comes with either Spark or Hive 0.13.</p> |
| |
| <p>To start the JDBC/ODBC server, run the following in the Spark directory:</p> |
| |
| <pre><code>./sbin/start-thriftserver.sh |
| </code></pre> |
| |
| <p>This script accepts all <code>bin/spark-submit</code> command line options, plus a <code>--hiveconf</code> option to |
| specify Hive properties. You may run <code>./sbin/start-thriftserver.sh --help</code> for a complete list of |
| all available options. By default, the server listens on localhost:10000. You may override this |
| bahaviour via either environment variables, i.e.:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nb">export </span><span class="nv">HIVE_SERVER2_THRIFT_PORT</span><span class="o">=</span><listening-port> |
| <span class="nb">export </span><span class="nv">HIVE_SERVER2_THRIFT_BIND_HOST</span><span class="o">=</span><listening-host> |
| ./sbin/start-thriftserver.sh <span class="se">\</span> |
| --master <master-uri> <span class="se">\</span> |
| ...</code></pre></div> |
| |
| <p>or system properties:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash">./sbin/start-thriftserver.sh <span class="se">\</span> |
| --hiveconf hive.server2.thrift.port<span class="o">=</span><listening-port> <span class="se">\</span> |
| --hiveconf hive.server2.thrift.bind.host<span class="o">=</span><listening-host> <span class="se">\</span> |
| --master <master-uri> |
| ...</code></pre></div> |
| |
| <p>Now you can use beeline to test the Thrift JDBC/ODBC server:</p> |
| |
| <pre><code>./bin/beeline |
| </code></pre> |
| |
| <p>Connect to the JDBC/ODBC server in beeline with:</p> |
| |
| <pre><code>beeline> !connect jdbc:hive2://localhost:10000 |
| </code></pre> |
| |
| <p>Beeline will ask you for a username and password. In non-secure mode, simply enter the username on |
| your machine and a blank password. For secure mode, please follow the instructions given in the |
| <a href="https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients">beeline documentation</a>.</p> |
| |
| <p>Configuration of Hive is done by placing your <code>hive-site.xml</code> file in <code>conf/</code>.</p> |
| |
| <p>You may also use the beeline script that comes with Hive.</p> |
| |
| <p>Thrift JDBC server also supports sending thrift RPC messages over HTTP transport. |
| Use the following setting to enable HTTP mode as system property or in <code>hive-site.xml</code> file in <code>conf/</code>:</p> |
| |
| <pre><code>hive.server2.transport.mode - Set this to value: http |
| hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 |
| hive.server2.http.endpoint - HTTP endpoint; default is cliservice |
| </code></pre> |
| |
| <p>To test, use beeline to connect to the JDBC/ODBC server in http mode with:</p> |
| |
| <pre><code>beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> |
| </code></pre> |
| |
| <h2 id="running-the-spark-sql-cli">Running the Spark SQL CLI</h2> |
| |
| <p>The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute |
| queries input from the command line. Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.</p> |
| |
| <p>To start the Spark SQL CLI, run the following in the Spark directory:</p> |
| |
| <pre><code>./bin/spark-sql |
| </code></pre> |
| |
| <p>Configuration of Hive is done by placing your <code>hive-site.xml</code> file in <code>conf/</code>. |
| You may run <code>./bin/spark-sql --help</code> for a complete list of all available |
| options.</p> |
| |
| <h1 id="migration-guide">Migration Guide</h1> |
| |
| <h2 id="upgrading-from-spark-sql-10-12-to-13">Upgrading from Spark SQL 1.0-1.2 to 1.3</h2> |
| |
| <p>In Spark 1.3 we removed the “Alpha” label from Spark SQL and as part of this did a cleanup of the |
| available APIs. From Spark 1.3 onwards, Spark SQL will provide binary compatibility with other |
| releases in the 1.X series. This compatibility guarantee excludes APIs that are explicitly marked |
| as unstable (i.e., DeveloperAPI or Experimental).</p> |
| |
| <h4 id="rename-of-schemardd-to-dataframe">Rename of SchemaRDD to DataFrame</h4> |
| |
| <p>The largest change that users will notice when upgrading to Spark SQL 1.3 is that <code>SchemaRDD</code> has |
| been renamed to <code>DataFrame</code>. This is primarily because DataFrames no longer inherit from RDD |
| directly, but instead provide most of the functionality that RDDs provide though their own |
| implementation. DataFrames can still be converted to RDDs by calling the <code>.rdd</code> method.</p> |
| |
| <p>In Scala there is a type alias from <code>SchemaRDD</code> to <code>DataFrame</code> to provide source compatibility for |
| some use cases. It is still recommended that users update their code to use <code>DataFrame</code> instead. |
| Java and Python users will need to update their code.</p> |
| |
| <h4 id="unification-of-the-java-and-scala-apis">Unification of the Java and Scala APIs</h4> |
| |
| <p>Prior to Spark 1.3 there were separate Java compatible classes (<code>JavaSQLContext</code> and <code>JavaSchemaRDD</code>) |
| that mirrored the Scala API. In Spark 1.3 the Java API and Scala API have been unified. Users |
| of either language should use <code>SQLContext</code> and <code>DataFrame</code>. In general theses classes try to |
| use types that are usable from both languages (i.e. <code>Array</code> instead of language specific collections). |
| In some cases where no common type exists (e.g., for passing in closures or Maps) function overloading |
| is used instead.</p> |
| |
| <p>Additionally the Java specific types API has been removed. Users of both Scala and Java should |
| use the classes present in <code>org.apache.spark.sql.types</code> to describe schema programmatically.</p> |
| |
| <h4 id="isolation-of-implicit-conversions-and-removal-of-dsl-package-scala-only">Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)</h4> |
| |
| <p>Many of the code examples prior to Spark 1.3 started with <code>import sqlContext._</code>, which brought |
| all of the functions from sqlContext into scope. In Spark 1.3 we have isolated the implicit |
| conversions for converting <code>RDD</code>s into <code>DataFrame</code>s into an object inside of the <code>SQLContext</code>. |
| Users should now write <code>import sqlContext.implicits._</code>.</p> |
| |
| <p>Additionally, the implicit conversions now only augment RDDs that are composed of <code>Product</code>s (i.e., |
| case classes or tuples) with a method <code>toDF</code>, instead of applying automatically.</p> |
| |
| <p>When using function inside of the DSL (now replaced with the <code>DataFrame</code> API) users used to import |
| <code>org.apache.spark.sql.catalyst.dsl</code>. Instead the public dataframe functions API should be used: |
| <code>import org.apache.spark.sql.functions._</code>.</p> |
| |
| <h4 id="removal-of-the-type-aliases-in-orgapachesparksql-for-datatype-scala-only">Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)</h4> |
| |
| <p>Spark 1.3 removes the type aliases that were present in the base sql package for <code>DataType</code>. Users |
| should instead import the classes in <code>org.apache.spark.sql.types</code></p> |
| |
| <h4 id="udf-registration-moved-to-sqlcontextudf-java--scala">UDF Registration Moved to <code>sqlContext.udf</code> (Java & Scala)</h4> |
| |
| <p>Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been |
| moved into the udf object in <code>SQLContext</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">sqlCtx</span><span class="o">.</span><span class="na">udf</span><span class="o">.</span><span class="na">register</span><span class="o">(</span><span class="s">"strLen"</span><span class="o">,</span> <span class="o">(</span><span class="nl">s:</span> <span class="n">String</span><span class="o">)</span> <span class="o">=></span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">())</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">sqlCtx</span><span class="o">.</span><span class="na">udf</span><span class="o">().</span><span class="na">register</span><span class="o">(</span><span class="s">"strLen"</span><span class="o">,</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">();</span> <span class="o">});</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <p>Python UDF registration is unchanged.</p> |
| |
| <h4 id="python-datatypes-no-longer-singletons">Python DataTypes No Longer Singletons</h4> |
| |
| <p>When using DataTypes in Python you will need to construct them (i.e. <code>StringType()</code>) instead of |
| referencing a singleton.</p> |
| |
| <h2 id="migration-guide-for-shark-user">Migration Guide for Shark User</h2> |
| |
| <h3 id="scheduling">Scheduling</h3> |
| <p>To set a <a href="job-scheduling.html#fair-scheduler-pools">Fair Scheduler</a> pool for a JDBC client session, |
| users can set the <code>spark.sql.thriftserver.scheduler.pool</code> variable:</p> |
| |
| <pre><code>SET spark.sql.thriftserver.scheduler.pool=accounting; |
| </code></pre> |
| |
| <h3 id="reducer-number">Reducer number</h3> |
| |
| <p>In Shark, default reducer number is 1 and is controlled by the property <code>mapred.reduce.tasks</code>. Spark |
| SQL deprecates this property in favor of <code>spark.sql.shuffle.partitions</code>, whose default value |
| is 200. Users may customize this property via <code>SET</code>:</p> |
| |
| <pre><code>SET spark.sql.shuffle.partitions=10; |
| SELECT page, count(*) c |
| FROM logs_last_month_cached |
| GROUP BY page ORDER BY c DESC LIMIT 10; |
| </code></pre> |
| |
| <p>You may also put this property in <code>hive-site.xml</code> to override the default value.</p> |
| |
| <p>For now, the <code>mapred.reduce.tasks</code> property is still recognized, and is converted to |
| <code>spark.sql.shuffle.partitions</code> automatically.</p> |
| |
| <h3 id="caching">Caching</h3> |
| |
| <p>The <code>shark.cache</code> table property no longer exists, and tables whose name end with <code>_cached</code> are no |
| longer automatically cached. Instead, we provide <code>CACHE TABLE</code> and <code>UNCACHE TABLE</code> statements to |
| let user control table caching explicitly:</p> |
| |
| <pre><code>CACHE TABLE logs_last_month; |
| UNCACHE TABLE logs_last_month; |
| </code></pre> |
| |
| <p><strong>NOTE:</strong> <code>CACHE TABLE tbl</code> is now <strong>eager</strong> by default not <strong>lazy</strong>. Don’t need to trigger cache materialization manually anymore.</p> |
| |
| <p>Spark SQL newly introduced a statement to let user control table caching whether or not lazy since Spark 1.2.0:</p> |
| |
| <pre><code>CACHE [LAZY] TABLE [AS SELECT] ... |
| </code></pre> |
| |
| <p>Several caching related features are not supported yet:</p> |
| |
| <ul> |
| <li>User defined partition level cache eviction policy</li> |
| <li>RDD reloading</li> |
| <li>In-memory cache write through policy</li> |
| </ul> |
| |
| <h2 id="compatibility-with-apache-hive">Compatibility with Apache Hive</h2> |
| |
| <p>Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark |
| SQL is based on Hive 0.12.0 and 0.13.1.</p> |
| |
| <h4 id="deploying-in-existing-hive-warehouses">Deploying in Existing Hive Warehouses</h4> |
| |
| <p>The Spark SQL Thrift JDBC server is designed to be “out of the box” compatible with existing Hive |
| installations. You do not need to modify your existing Hive Metastore or change the data placement |
| or partitioning of your tables.</p> |
| |
| <h3 id="supported-hive-features">Supported Hive Features</h3> |
| |
| <p>Spark SQL supports the vast majority of Hive features, such as:</p> |
| |
| <ul> |
| <li>Hive query statements, including: |
| <ul> |
| <li><code>SELECT</code></li> |
| <li><code>GROUP BY</code></li> |
| <li><code>ORDER BY</code></li> |
| <li><code>CLUSTER BY</code></li> |
| <li><code>SORT BY</code></li> |
| </ul> |
| </li> |
| <li>All Hive operators, including: |
| <ul> |
| <li>Relational operators (<code>=</code>, <code>⇔</code>, <code>==</code>, <code><></code>, <code><</code>, <code>></code>, <code>>=</code>, <code><=</code>, etc)</li> |
| <li>Arithmetic operators (<code>+</code>, <code>-</code>, <code>*</code>, <code>/</code>, <code>%</code>, etc)</li> |
| <li>Logical operators (<code>AND</code>, <code>&&</code>, <code>OR</code>, <code>||</code>, etc)</li> |
| <li>Complex type constructors</li> |
| <li>Mathematical functions (<code>sign</code>, <code>ln</code>, <code>cos</code>, etc)</li> |
| <li>String functions (<code>instr</code>, <code>length</code>, <code>printf</code>, etc)</li> |
| </ul> |
| </li> |
| <li>User defined functions (UDF)</li> |
| <li>User defined aggregation functions (UDAF)</li> |
| <li>User defined serialization formats (SerDes)</li> |
| <li>Joins |
| <ul> |
| <li><code>JOIN</code></li> |
| <li><code>{LEFT|RIGHT|FULL} OUTER JOIN</code></li> |
| <li><code>LEFT SEMI JOIN</code></li> |
| <li><code>CROSS JOIN</code></li> |
| </ul> |
| </li> |
| <li>Unions</li> |
| <li>Sub-queries |
| <ul> |
| <li><code>SELECT col FROM ( SELECT a + b AS col from t1) t2</code></li> |
| </ul> |
| </li> |
| <li>Sampling</li> |
| <li>Explain</li> |
| <li>Partitioned tables</li> |
| <li>View</li> |
| <li>All Hive DDL Functions, including: |
| <ul> |
| <li><code>CREATE TABLE</code></li> |
| <li><code>CREATE TABLE AS SELECT</code></li> |
| <li><code>ALTER TABLE</code></li> |
| </ul> |
| </li> |
| <li>Most Hive Data types, including: |
| <ul> |
| <li><code>TINYINT</code></li> |
| <li><code>SMALLINT</code></li> |
| <li><code>INT</code></li> |
| <li><code>BIGINT</code></li> |
| <li><code>BOOLEAN</code></li> |
| <li><code>FLOAT</code></li> |
| <li><code>DOUBLE</code></li> |
| <li><code>STRING</code></li> |
| <li><code>BINARY</code></li> |
| <li><code>TIMESTAMP</code></li> |
| <li><code>DATE</code></li> |
| <li><code>ARRAY<></code></li> |
| <li><code>MAP<></code></li> |
| <li><code>STRUCT<></code></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h3 id="unsupported-hive-functionality">Unsupported Hive Functionality</h3> |
| |
| <p>Below is a list of Hive features that we don’t support yet. Most of these features are rarely used |
| in Hive deployments.</p> |
| |
| <p><strong>Major Hive Features</strong></p> |
| |
| <ul> |
| <li>Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL |
| doesn’t support buckets yet.</li> |
| </ul> |
| |
| <p><strong>Esoteric Hive Features</strong> |
| * <code>UNION</code> type |
| * Unique join |
| * Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at |
| the moment and only supports populating the sizeInBytes field of the hive metastore.</p> |
| |
| <p><strong>Hive Input/Output Formats</strong></p> |
| |
| <ul> |
| <li>File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.</li> |
| <li>Hadoop archive</li> |
| </ul> |
| |
| <p><strong>Hive Optimizations</strong></p> |
| |
| <p>A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are |
| less important due to Spark SQL’s in-memory computational model. Others are slotted for future |
| releases of Spark SQL.</p> |
| |
| <ul> |
| <li>Block level bitmap indexes and virtual columns (used to build indexes)</li> |
| <li>Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you |
| need to control the degree of parallelism post-shuffle using “<code>SET spark.sql.shuffle.partitions=[num_tasks];</code>”.</li> |
| <li>Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still |
| launches tasks to compute the result.</li> |
| <li>Skew data flag: Spark SQL does not follow the skew data flags in Hive.</li> |
| <li><code>STREAMTABLE</code> hint in join: Spark SQL does not follow the <code>STREAMTABLE</code> hint.</li> |
| <li>Merge multiple small files for query results: if the result output contains multiple small files, |
| Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS |
| metadata. Spark SQL does not support that.</li> |
| </ul> |
| |
| <h1 id="data-types">Data Types</h1> |
| |
| <p>Spark SQL and DataFrames support the following data types:</p> |
| |
| <ul> |
| <li>Numeric types |
| <ul> |
| <li><code>ByteType</code>: Represents 1-byte signed integer numbers. |
| The range of numbers is from <code>-128</code> to <code>127</code>.</li> |
| <li><code>ShortType</code>: Represents 2-byte signed integer numbers. |
| The range of numbers is from <code>-32768</code> to <code>32767</code>.</li> |
| <li><code>IntegerType</code>: Represents 4-byte signed integer numbers. |
| The range of numbers is from <code>-2147483648</code> to <code>2147483647</code>.</li> |
| <li><code>LongType</code>: Represents 8-byte signed integer numbers. |
| The range of numbers is from <code>-9223372036854775808</code> to <code>9223372036854775807</code>.</li> |
| <li><code>FloatType</code>: Represents 4-byte single-precision floating point numbers.</li> |
| <li><code>DoubleType</code>: Represents 8-byte double-precision floating point numbers.</li> |
| <li><code>DecimalType</code>: Represents arbitrary-precision signed decimal numbers. Backed internally by <code>java.math.BigDecimal</code>. A <code>BigDecimal</code> consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.</li> |
| </ul> |
| </li> |
| <li>String type |
| <ul> |
| <li><code>StringType</code>: Represents character string values.</li> |
| </ul> |
| </li> |
| <li>Binary type |
| <ul> |
| <li><code>BinaryType</code>: Represents byte sequence values.</li> |
| </ul> |
| </li> |
| <li>Boolean type |
| <ul> |
| <li><code>BooleanType</code>: Represents boolean values.</li> |
| </ul> |
| </li> |
| <li>Datetime type |
| <ul> |
| <li><code>TimestampType</code>: Represents values comprising values of fields year, month, day, |
| hour, minute, and second.</li> |
| <li><code>DateType</code>: Represents values comprising values of fields year, month, day.</li> |
| </ul> |
| </li> |
| <li>Complex types |
| <ul> |
| <li><code>ArrayType(elementType, containsNull)</code>: Represents values comprising a sequence of |
| elements with the type of <code>elementType</code>. <code>containsNull</code> is used to indicate if |
| elements in a <code>ArrayType</code> value can have <code>null</code> values.</li> |
| <li><code>MapType(keyType, valueType, valueContainsNull)</code>: |
| Represents values comprising a set of key-value pairs. The data type of keys are |
| described by <code>keyType</code> and the data type of values are described by <code>valueType</code>. |
| For a <code>MapType</code> value, keys are not allowed to have <code>null</code> values. <code>valueContainsNull</code> |
| is used to indicate if values of a <code>MapType</code> value can have <code>null</code> values.</li> |
| <li><code>StructType(fields)</code>: Represents values with the structure described by |
| a sequence of <code>StructField</code>s (<code>fields</code>). |
| <ul> |
| <li><code>StructField(name, dataType, nullable)</code>: Represents a field in a <code>StructType</code>. |
| The name of a field is indicated by <code>name</code>. The data type of a field is indicated |
| by <code>dataType</code>. <code>nullable</code> is used to indicate if values of this fields can have |
| <code>null</code> values.</li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| </ul> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>All data types of Spark SQL are located in the package <code>org.apache.spark.sql.types</code>. |
| You can access them by doing</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span></code></pre></div> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in Scala</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> Byte </td> |
| <td> |
| ByteType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> Short </td> |
| <td> |
| ShortType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> Int </td> |
| <td> |
| IntegerType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> Long </td> |
| <td> |
| LongType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> Float </td> |
| <td> |
| FloatType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> Double </td> |
| <td> |
| DoubleType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> java.math.BigDecimal </td> |
| <td> |
| DecimalType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> String </td> |
| <td> |
| StringType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> Array[Byte] </td> |
| <td> |
| BinaryType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> Boolean </td> |
| <td> |
| BooleanType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> java.sql.Timestamp </td> |
| <td> |
| TimestampType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> java.sql.Date </td> |
| <td> |
| DateType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> scala.collection.Seq </td> |
| <td> |
| ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>containsNull</i> is <i>true</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> scala.collection.Map </td> |
| <td> |
| MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>true</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> org.apache.spark.sql.Row </td> |
| <td> |
| StructType(<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same |
| name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in Scala of the data type of this field |
| (For example, Int for a StructField with the data type IntegerType) </td> |
| <td> |
| StructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>All data types of Spark SQL are located in the package of |
| <code>org.apache.spark.sql.types</code>. To access or create a data type, |
| please use factory methods provided in |
| <code>org.apache.spark.sql.types.DataTypes</code>.</p> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in Java</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> byte or Byte </td> |
| <td> |
| DataTypes.ByteType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> short or Short </td> |
| <td> |
| DataTypes.ShortType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> int or Integer </td> |
| <td> |
| DataTypes.IntegerType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> long or Long </td> |
| <td> |
| DataTypes.LongType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> float or Float </td> |
| <td> |
| DataTypes.FloatType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> double or Double </td> |
| <td> |
| DataTypes.DoubleType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> java.math.BigDecimal </td> |
| <td> |
| DataTypes.createDecimalType()<br /> |
| DataTypes.createDecimalType(<i>precision</i>, <i>scale</i>). |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> String </td> |
| <td> |
| DataTypes.StringType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> byte[] </td> |
| <td> |
| DataTypes.BinaryType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> boolean or Boolean </td> |
| <td> |
| DataTypes.BooleanType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> java.sql.Timestamp </td> |
| <td> |
| DataTypes.TimestampType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> java.sql.Date </td> |
| <td> |
| DataTypes.DateType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> java.util.List </td> |
| <td> |
| DataTypes.createArrayType(<i>elementType</i>)<br /> |
| <b>Note:</b> The value of <i>containsNull</i> will be <i>true</i><br /> |
| DataTypes.createArrayType(<i>elementType</i>, <i>containsNull</i>). |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> java.util.Map </td> |
| <td> |
| DataTypes.createMapType(<i>keyType</i>, <i>valueType</i>)<br /> |
| <b>Note:</b> The value of <i>valueContainsNull</i> will be <i>true</i>.<br /> |
| DataTypes.createMapType(<i>keyType</i>, <i>valueType</i>, <i>valueContainsNull</i>)<br /> |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> org.apache.spark.sql.Row </td> |
| <td> |
| DataTypes.createStructType(<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a List or an array of StructFields. |
| Also, two fields with the same name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in Java of the data type of this field |
| (For example, int for a StructField with the data type IntegerType) </td> |
| <td> |
| DataTypes.createStructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>All data types of Spark SQL are located in the package of <code>pyspark.sql.types</code>. |
| You can access them by doing</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="o">*</span></code></pre></div> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in Python</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> |
| int or long <br /> |
| <b>Note:</b> Numbers will be converted to 1-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of -128 to 127. |
| </td> |
| <td> |
| ByteType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> |
| int or long <br /> |
| <b>Note:</b> Numbers will be converted to 2-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of -32768 to 32767. |
| </td> |
| <td> |
| ShortType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> int or long </td> |
| <td> |
| IntegerType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> |
| long <br /> |
| <b>Note:</b> Numbers will be converted to 8-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of |
| -9223372036854775808 to 9223372036854775807. |
| Otherwise, please convert data to decimal.Decimal and use DecimalType. |
| </td> |
| <td> |
| LongType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> |
| float <br /> |
| <b>Note:</b> Numbers will be converted to 4-byte single-precision floating |
| point numbers at runtime. |
| </td> |
| <td> |
| FloatType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> float </td> |
| <td> |
| DoubleType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> decimal.Decimal </td> |
| <td> |
| DecimalType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> string </td> |
| <td> |
| StringType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> bytearray </td> |
| <td> |
| BinaryType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> bool </td> |
| <td> |
| BooleanType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> datetime.datetime </td> |
| <td> |
| TimestampType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> datetime.date </td> |
| <td> |
| DateType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> list, tuple, or array </td> |
| <td> |
| ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>containsNull</i> is <i>True</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> dict </td> |
| <td> |
| MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>True</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> list or tuple </td> |
| <td> |
| StructType(<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same |
| name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in Python of the data type of this field |
| (For example, Int for a StructField with the data type IntegerType) </td> |
| <td> |
| StructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| </div> |
| |
| |
| |
| </div> <!-- /container --> |
| |
| <script src="js/vendor/jquery-1.8.0.min.js"></script> |
| <script src="js/vendor/bootstrap.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |