| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <title>Spark SQL and DataFrames - Spark 1.5.2 Documentation</title> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <meta name="viewport" content="width=device-width"> |
| <link rel="stylesheet" href="css/bootstrap-responsive.min.css"> |
| <link rel="stylesheet" href="css/main.css"> |
| |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| |
| |
| <!-- Google analytics script --> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-32518208-2']); |
| _gaq.push(['_trackPageview']); |
| |
| (function() { |
| var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; |
| ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); |
| })(); |
| </script> |
| |
| |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <div class="navbar navbar-fixed-top" id="topbar"> |
| <div class="navbar-inner"> |
| <div class="container"> |
| <div class="brand"><a href="index.html"> |
| <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.5.2</span> |
| </div> |
| <ul class="nav"> |
| <!--TODO(andyk): Add class="active" attribute to li some how.--> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="quick-start.html">Quick Start</a></li> |
| <li><a href="programming-guide.html">Spark Programming Guide</a></li> |
| <li class="divider"></li> |
| <li><a href="streaming-programming-guide.html">Spark Streaming</a></li> |
| <li><a href="sql-programming-guide.html">DataFrames and SQL</a></li> |
| <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li> |
| <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> |
| <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li> |
| <li><a href="sparkr.html">SparkR (R on Spark)</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li> |
| <li><a href="api/java/index.html">Java</a></li> |
| <li><a href="api/python/index.html">Python</a></li> |
| <li><a href="api/R/index.html">R</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="cluster-overview.html">Overview</a></li> |
| <li><a href="submitting-applications.html">Submitting Applications</a></li> |
| <li class="divider"></li> |
| <li><a href="spark-standalone.html">Spark Standalone</a></li> |
| <li><a href="running-on-mesos.html">Mesos</a></li> |
| <li><a href="running-on-yarn.html">YARN</a></li> |
| <li class="divider"></li> |
| <li><a href="ec2-scripts.html">Amazon EC2</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="configuration.html">Configuration</a></li> |
| <li><a href="monitoring.html">Monitoring</a></li> |
| <li><a href="tuning.html">Tuning Guide</a></li> |
| <li><a href="job-scheduling.html">Job Scheduling</a></li> |
| <li><a href="security.html">Security</a></li> |
| <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> |
| <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li> |
| <li class="divider"></li> |
| <li><a href="building-spark.html">Building Spark</a></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects">Supplemental Projects</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!--<p class="navbar-text pull-right"><span class="version-text">v1.5.2</span></p>--> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container" id="content"> |
| |
| <h1 class="title">Spark SQL and DataFrame Guide</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#overview" id="markdown-toc-overview">Overview</a></li> |
| <li><a href="#dataframes" id="markdown-toc-dataframes">DataFrames</a> <ul> |
| <li><a href="#starting-point-sqlcontext" id="markdown-toc-starting-point-sqlcontext">Starting Point: SQLContext</a></li> |
| <li><a href="#creating-dataframes" id="markdown-toc-creating-dataframes">Creating DataFrames</a></li> |
| <li><a href="#dataframe-operations" id="markdown-toc-dataframe-operations">DataFrame Operations</a></li> |
| <li><a href="#running-sql-queries-programmatically" id="markdown-toc-running-sql-queries-programmatically">Running SQL Queries Programmatically</a></li> |
| <li><a href="#interoperating-with-rdds" id="markdown-toc-interoperating-with-rdds">Interoperating with RDDs</a> <ul> |
| <li><a href="#inferring-the-schema-using-reflection" id="markdown-toc-inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</a></li> |
| <li><a href="#programmatically-specifying-the-schema" id="markdown-toc-programmatically-specifying-the-schema">Programmatically Specifying the Schema</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#data-sources" id="markdown-toc-data-sources">Data Sources</a> <ul> |
| <li><a href="#generic-loadsave-functions" id="markdown-toc-generic-loadsave-functions">Generic Load/Save Functions</a> <ul> |
| <li><a href="#manually-specifying-options" id="markdown-toc-manually-specifying-options">Manually Specifying Options</a></li> |
| <li><a href="#save-modes" id="markdown-toc-save-modes">Save Modes</a></li> |
| <li><a href="#saving-to-persistent-tables" id="markdown-toc-saving-to-persistent-tables">Saving to Persistent Tables</a></li> |
| </ul> |
| </li> |
| <li><a href="#parquet-files" id="markdown-toc-parquet-files">Parquet Files</a> <ul> |
| <li><a href="#loading-data-programmatically" id="markdown-toc-loading-data-programmatically">Loading Data Programmatically</a></li> |
| <li><a href="#partition-discovery" id="markdown-toc-partition-discovery">Partition Discovery</a></li> |
| <li><a href="#schema-merging" id="markdown-toc-schema-merging">Schema Merging</a></li> |
| <li><a href="#hive-metastore-parquet-table-conversion" id="markdown-toc-hive-metastore-parquet-table-conversion">Hive metastore Parquet table conversion</a> <ul> |
| <li><a href="#hiveparquet-schema-reconciliation" id="markdown-toc-hiveparquet-schema-reconciliation">Hive/Parquet Schema Reconciliation</a></li> |
| <li><a href="#metadata-refreshing" id="markdown-toc-metadata-refreshing">Metadata Refreshing</a></li> |
| </ul> |
| </li> |
| <li><a href="#configuration" id="markdown-toc-configuration">Configuration</a></li> |
| </ul> |
| </li> |
| <li><a href="#json-datasets" id="markdown-toc-json-datasets">JSON Datasets</a></li> |
| <li><a href="#hive-tables" id="markdown-toc-hive-tables">Hive Tables</a> <ul> |
| <li><a href="#interacting-with-different-versions-of-hive-metastore" id="markdown-toc-interacting-with-different-versions-of-hive-metastore">Interacting with Different Versions of Hive Metastore</a></li> |
| </ul> |
| </li> |
| <li><a href="#jdbc-to-other-databases" id="markdown-toc-jdbc-to-other-databases">JDBC To Other Databases</a></li> |
| <li><a href="#troubleshooting" id="markdown-toc-troubleshooting">Troubleshooting</a></li> |
| </ul> |
| </li> |
| <li><a href="#performance-tuning" id="markdown-toc-performance-tuning">Performance Tuning</a> <ul> |
| <li><a href="#caching-data-in-memory" id="markdown-toc-caching-data-in-memory">Caching Data In Memory</a></li> |
| <li><a href="#other-configuration-options" id="markdown-toc-other-configuration-options">Other Configuration Options</a></li> |
| </ul> |
| </li> |
| <li><a href="#distributed-sql-engine" id="markdown-toc-distributed-sql-engine">Distributed SQL Engine</a> <ul> |
| <li><a href="#running-the-thrift-jdbcodbc-server" id="markdown-toc-running-the-thrift-jdbcodbc-server">Running the Thrift JDBC/ODBC server</a></li> |
| <li><a href="#running-the-spark-sql-cli" id="markdown-toc-running-the-spark-sql-cli">Running the Spark SQL CLI</a></li> |
| </ul> |
| </li> |
| <li><a href="#migration-guide" id="markdown-toc-migration-guide">Migration Guide</a> <ul> |
| <li><a href="#upgrading-from-spark-sql-14-to-15" id="markdown-toc-upgrading-from-spark-sql-14-to-15">Upgrading From Spark SQL 1.4 to 1.5</a></li> |
| <li><a href="#upgrading-from-spark-sql-13-to-14" id="markdown-toc-upgrading-from-spark-sql-13-to-14">Upgrading from Spark SQL 1.3 to 1.4</a> <ul> |
| <li><a href="#dataframe-data-readerwriter-interface" id="markdown-toc-dataframe-data-readerwriter-interface">DataFrame data reader/writer interface</a></li> |
| <li><a href="#dataframegroupby-retains-grouping-columns" id="markdown-toc-dataframegroupby-retains-grouping-columns">DataFrame.groupBy retains grouping columns</a></li> |
| </ul> |
| </li> |
| <li><a href="#upgrading-from-spark-sql-10-12-to-13" id="markdown-toc-upgrading-from-spark-sql-10-12-to-13">Upgrading from Spark SQL 1.0-1.2 to 1.3</a> <ul> |
| <li><a href="#rename-of-schemardd-to-dataframe" id="markdown-toc-rename-of-schemardd-to-dataframe">Rename of SchemaRDD to DataFrame</a></li> |
| <li><a href="#unification-of-the-java-and-scala-apis" id="markdown-toc-unification-of-the-java-and-scala-apis">Unification of the Java and Scala APIs</a></li> |
| <li><a href="#isolation-of-implicit-conversions-and-removal-of-dsl-package-scala-only" id="markdown-toc-isolation-of-implicit-conversions-and-removal-of-dsl-package-scala-only">Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)</a></li> |
| <li><a href="#removal-of-the-type-aliases-in-orgapachesparksql-for-datatype-scala-only" id="markdown-toc-removal-of-the-type-aliases-in-orgapachesparksql-for-datatype-scala-only">Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)</a></li> |
| <li><a href="#udf-registration-moved-to-sqlcontextudf-java--scala" id="markdown-toc-udf-registration-moved-to-sqlcontextudf-java--scala">UDF Registration Moved to <code>sqlContext.udf</code> (Java & Scala)</a></li> |
| <li><a href="#python-datatypes-no-longer-singletons" id="markdown-toc-python-datatypes-no-longer-singletons">Python DataTypes No Longer Singletons</a></li> |
| </ul> |
| </li> |
| <li><a href="#migration-guide-for-shark-users" id="markdown-toc-migration-guide-for-shark-users">Migration Guide for Shark Users</a> <ul> |
| <li><a href="#scheduling" id="markdown-toc-scheduling">Scheduling</a></li> |
| <li><a href="#reducer-number" id="markdown-toc-reducer-number">Reducer number</a></li> |
| <li><a href="#caching" id="markdown-toc-caching">Caching</a></li> |
| </ul> |
| </li> |
| <li><a href="#compatibility-with-apache-hive" id="markdown-toc-compatibility-with-apache-hive">Compatibility with Apache Hive</a> <ul> |
| <li><a href="#deploying-in-existing-hive-warehouses" id="markdown-toc-deploying-in-existing-hive-warehouses">Deploying in Existing Hive Warehouses</a></li> |
| <li><a href="#supported-hive-features" id="markdown-toc-supported-hive-features">Supported Hive Features</a></li> |
| <li><a href="#unsupported-hive-functionality" id="markdown-toc-unsupported-hive-functionality">Unsupported Hive Functionality</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li><a href="#reference" id="markdown-toc-reference">Reference</a> <ul> |
| <li><a href="#data-types" id="markdown-toc-data-types">Data Types</a></li> |
| <li><a href="#nan-semantics" id="markdown-toc-nan-semantics">NaN Semantics</a></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h1 id="overview">Overview</h1> |
| |
| <p>Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.</p> |
| |
| <p>Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the <a href="#hive-tables">Hive Tables</a> section.</p> |
| |
| <h1 id="dataframes">DataFrames</h1> |
| |
| <p>A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.</p> |
| |
| <p>The DataFrame API is available in <a href="api/scala/index.html#org.apache.spark.sql.DataFrame">Scala</a>, <a href="api/java/index.html?org/apache/spark/sql/DataFrame.html">Java</a>, <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">Python</a>, and <a href="api/R/index.html">R</a>.</p> |
| |
| <p>All of the examples on this page use sample data included in the Spark distribution and can be run in the <code>spark-shell</code>, <code>pyspark</code> shell, or <code>sparkR</code> shell.</p> |
| |
| <h2 id="starting-point-sqlcontext">Starting Point: SQLContext</h2> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>The entry point into all functionality in Spark SQL is the |
| <a href="api/scala/index.html#org.apache.spark.sql.SQLContext"><code>SQLContext</code></a> class, or one of its |
| descendants. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// this is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>The entry point into all functionality in Spark SQL is the |
| <a href="api/java/index.html#org.apache.spark.sql.SQLContext"><code>SQLContext</code></a> class, or one of its |
| descendants. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="o">...;</span> <span class="c1">// An existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>The entry point into all relational functionality in Spark is the |
| <a href="api/python/pyspark.sql.html#pyspark.sql.SQLContext"><code>SQLContext</code></a> class, or one |
| of its decedents. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <p>The entry point into all relational functionality in Spark is the |
| <code>SQLContext</code> class, or one of its decedents. To create a basic <code>SQLContext</code>, all you need is a SparkContext.</p> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">sqlContext <span class="o"><-</span> sparkRSQL.init<span class="p">(</span>sc<span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>In addition to the basic <code>SQLContext</code>, you can also create a <code>HiveContext</code>, which provides a |
| superset of the functionality provided by the basic <code>SQLContext</code>. Additional features include |
| the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the |
| ability to read data from Hive tables. To use a <code>HiveContext</code>, you do not need to have an |
| existing Hive setup, and all of the data sources available to a <code>SQLContext</code> are still available. |
| <code>HiveContext</code> is only packaged separately to avoid including all of Hive’s dependencies in the default |
| Spark build. If these dependencies are not a problem for your application then using <code>HiveContext</code> |
| is recommended for the 1.3 release of Spark. Future releases will focus on bringing <code>SQLContext</code> up |
| to feature parity with a <code>HiveContext</code>.</p> |
| |
| <p>The specific variant of SQL that is used to parse queries can also be selected using the |
| <code>spark.sql.dialect</code> option. This parameter can be changed using either the <code>setConf</code> method on |
| a <code>SQLContext</code> or by using a <code>SET key=value</code> command in SQL. For a <code>SQLContext</code>, the only dialect |
| available is “sql” which uses a simple SQL parser provided by Spark SQL. In a <code>HiveContext</code>, the |
| default is “hiveql”, though “sql” is also available. Since the HiveQL parser is much more complete, |
| this is recommended for most use cases.</p> |
| |
| <h2 id="creating-dataframes">Creating DataFrames</h2> |
| |
| <p>With a <code>SQLContext</code>, applications can create <code>DataFrame</code>s from an <a href="#interoperating-with-rdds">existing <code>RDD</code></a>, from a Hive table, or from <a href="#data-sources">data sources</a>.</p> |
| |
| <p>As an example, the following creates a <code>DataFrame</code> based on the content of a JSON file:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">)</span> |
| |
| <span class="c1">// Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="o">...;</span> <span class="c1">// An existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| |
| <span class="c1">// Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c"># Displays the content of the DataFrame to stdout</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="p">()</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">sqlContext <span class="o"><-</span> SQLContext<span class="p">(</span>sc<span class="p">)</span> |
| |
| df <span class="o"><-</span> jsonFile<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c1"># Displays the content of the DataFrame to stdout</span> |
| showDF<span class="p">(</span>df<span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="dataframe-operations">DataFrame Operations</h2> |
| |
| <p>DataFrames provide a domain-specific language for structured data manipulation in <a href="api/scala/index.html#org.apache.spark.sql.DataFrame">Scala</a>, <a href="api/java/index.html?org/apache/spark/sql/DataFrame.html">Java</a>, and <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">Python</a>.</p> |
| |
| <p>Here we include some basic examples of structured data processing using DataFrames:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sc</span><span class="k">:</span> <span class="kt">SparkContext</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// Create the DataFrame</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">)</span> |
| |
| <span class="c1">// Show the content of the DataFrame</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// null Michael</span> |
| <span class="c1">// 30 Andy</span> |
| <span class="c1">// 19 Justin</span> |
| |
| <span class="c1">// Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: long (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// name</span> |
| <span class="c1">// Michael</span> |
| <span class="c1">// Andy</span> |
| <span class="c1">// Justin</span> |
| |
| <span class="c1">// Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="n">df</span><span class="o">(</span><span class="s">"name"</span><span class="o">),</span> <span class="n">df</span><span class="o">(</span><span class="s">"age"</span><span class="o">)</span> <span class="o">+</span> <span class="mi">1</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// name (age + 1)</span> |
| <span class="c1">// Michael null</span> |
| <span class="c1">// Andy 31</span> |
| <span class="c1">// Justin 20</span> |
| |
| <span class="c1">// Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">df</span><span class="o">(</span><span class="s">"age"</span><span class="o">)</span> <span class="o">></span> <span class="mi">21</span><span class="o">).</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// 30 Andy</span> |
| |
| <span class="c1">// Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="n">count</span><span class="o">().</span><span class="n">show</span><span class="o">()</span> |
| <span class="c1">// age count</span> |
| <span class="c1">// null 1</span> |
| <span class="c1">// 19 1</span> |
| <span class="c1">// 30 1</span></code></pre></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/scala/index.html#org.apache.spark.sql.DataFrame">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/scala/index.html#org.apache.spark.sql.functions$">DataFrame Function Reference</a>.</p> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="c1">// An existing SparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// Create the DataFrame</span> |
| <span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| |
| <span class="c1">// Show the content of the DataFrame</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// null Michael</span> |
| <span class="c1">// 30 Andy</span> |
| <span class="c1">// 19 Justin</span> |
| |
| <span class="c1">// Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: long (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// name</span> |
| <span class="c1">// Michael</span> |
| <span class="c1">// Andy</span> |
| <span class="c1">// Justin</span> |
| |
| <span class="c1">// Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">df</span><span class="o">.</span><span class="na">col</span><span class="o">(</span><span class="s">"name"</span><span class="o">),</span> <span class="n">df</span><span class="o">.</span><span class="na">col</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">plus</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// name (age + 1)</span> |
| <span class="c1">// Michael null</span> |
| <span class="c1">// Andy 31</span> |
| <span class="c1">// Justin 20</span> |
| |
| <span class="c1">// Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">df</span><span class="o">.</span><span class="na">col</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">gt</span><span class="o">(</span><span class="mi">21</span><span class="o">)).</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// age name</span> |
| <span class="c1">// 30 Andy</span> |
| |
| <span class="c1">// Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"age"</span><span class="o">).</span><span class="na">count</span><span class="o">().</span><span class="na">show</span><span class="o">();</span> |
| <span class="c1">// age count</span> |
| <span class="c1">// null 1</span> |
| <span class="c1">// 19 1</span> |
| <span class="c1">// 30 1</span></code></pre></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/java/org/apache/spark/sql/DataFrame.html">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/java/org/apache/spark/sql/functions.html">DataFrame Function Reference</a>.</p> |
| |
| </div> |
| |
| <div data-lang="python"> |
| <p>In Python it’s possible to access a DataFrame’s columns either by attribute |
| (<code>df.age</code>) or by indexing (<code>df['age']</code>). While the former is convenient for |
| interactive data exploration, users are highly encouraged to use the |
| latter form, which is future proof and won’t break with column names that |
| are also attributes on the DataFrame class.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># Create the DataFrame</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c"># Show the content of the DataFrame</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## age name</span> |
| <span class="c">## null Michael</span> |
| <span class="c">## 30 Andy</span> |
| <span class="c">## 19 Justin</span> |
| |
| <span class="c"># Print the schema in a tree format</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c">## root</span> |
| <span class="c">## |-- age: long (nullable = true)</span> |
| <span class="c">## |-- name: string (nullable = true)</span> |
| |
| <span class="c"># Select only the "name" column</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## name</span> |
| <span class="c">## Michael</span> |
| <span class="c">## Andy</span> |
| <span class="c">## Justin</span> |
| |
| <span class="c"># Select everybody, but increment the age by 1</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'name'</span><span class="p">],</span> <span class="n">df</span><span class="p">[</span><span class="s">'age'</span><span class="p">]</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## name (age + 1)</span> |
| <span class="c">## Michael null</span> |
| <span class="c">## Andy 31</span> |
| <span class="c">## Justin 20</span> |
| |
| <span class="c"># Select people older than 21</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">df</span><span class="p">[</span><span class="s">'age'</span><span class="p">]</span> <span class="o">></span> <span class="mi">21</span><span class="p">)</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## age name</span> |
| <span class="c">## 30 Andy</span> |
| |
| <span class="c"># Count people by age</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="p">(</span><span class="s">"age"</span><span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span><span class="o">.</span><span class="n">show</span><span class="p">()</span> |
| <span class="c">## age count</span> |
| <span class="c">## null 1</span> |
| <span class="c">## 19 1</span> |
| <span class="c">## 30 1</span></code></pre></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/python/pyspark.sql.html#module-pyspark.sql.functions">DataFrame Function Reference</a>.</p> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">sqlContext <span class="o"><-</span> sparkRSQL.init<span class="p">(</span>sc<span class="p">)</span> |
| |
| <span class="c1"># Create the DataFrame</span> |
| df <span class="o"><-</span> jsonFile<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c1"># Show the content of the DataFrame</span> |
| showDF<span class="p">(</span>df<span class="p">)</span> |
| <span class="c1">## age name</span> |
| <span class="c1">## null Michael</span> |
| <span class="c1">## 30 Andy</span> |
| <span class="c1">## 19 Justin</span> |
| |
| <span class="c1"># Print the schema in a tree format</span> |
| printSchema<span class="p">(</span>df<span class="p">)</span> |
| <span class="c1">## root</span> |
| <span class="c1">## |-- age: long (nullable = true)</span> |
| <span class="c1">## |-- name: string (nullable = true)</span> |
| |
| <span class="c1"># Select only the "name" column</span> |
| showDF<span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> <span class="s">"name"</span><span class="p">))</span> |
| <span class="c1">## name</span> |
| <span class="c1">## Michael</span> |
| <span class="c1">## Andy</span> |
| <span class="c1">## Justin</span> |
| |
| <span class="c1"># Select everybody, but increment the age by 1</span> |
| showDF<span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> df<span class="o">$</span>name<span class="p">,</span> df<span class="o">$</span>age <span class="o">+</span> <span class="m">1</span><span class="p">))</span> |
| <span class="c1">## name (age + 1)</span> |
| <span class="c1">## Michael null</span> |
| <span class="c1">## Andy 31</span> |
| <span class="c1">## Justin 20</span> |
| |
| <span class="c1"># Select people older than 21</span> |
| showDF<span class="p">(</span>where<span class="p">(</span>df<span class="p">,</span> df<span class="o">$</span>age <span class="o">></span> <span class="m">21</span><span class="p">))</span> |
| <span class="c1">## age name</span> |
| <span class="c1">## 30 Andy</span> |
| |
| <span class="c1"># Count people by age</span> |
| showDF<span class="p">(</span>count<span class="p">(</span>groupBy<span class="p">(</span>df<span class="p">,</span> <span class="s">"age"</span><span class="p">)))</span> |
| <span class="c1">## age count</span> |
| <span class="c1">## null 1</span> |
| <span class="c1">## 19 1</span> |
| <span class="c1">## 30 1</span></code></pre></div> |
| |
| <p>For a complete list of the types of operations that can be performed on a DataFrame refer to the <a href="api/R/index.html">API Documentation</a>.</p> |
| |
| <p>In addition to simple column references and expressions, DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in the <a href="api/R/index.html">DataFrame Function Reference</a>.</p> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="running-sql-queries-programmatically">Running SQL Queries Programmatically</h2> |
| |
| <p>The <code>sql</code> function on a <code>SQLContext</code> enables applications to run SQL queries programmatically and returns the result as a <code>DataFrame</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// An existing SQLContext</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT * FROM table"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// An existing SQLContext</span> |
| <span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT * FROM table"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT * FROM table"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">sqlContext <span class="o"><-</span> sparkRSQL.init<span class="p">(</span>sc<span class="p">)</span> |
| df <span class="o"><-</span> sql<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"SELECT * FROM table"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="interoperating-with-rdds">Interoperating with RDDs</h2> |
| |
| <p>Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first |
| method uses reflection to infer the schema of an RDD that contains specific types of objects. This |
| reflection based approach leads to more concise code and works well when you already know the schema |
| while writing your Spark application.</p> |
| |
| <p>The second method for creating DataFrames is through a programmatic interface that allows you to |
| construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows |
| you to construct DataFrames when the columns and their types are not known until runtime.</p> |
| |
| <h3 id="inferring-the-schema-using-reflection">Inferring the Schema Using Reflection</h3> |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>The Scala interface for Spark SQL supports automatically converting an RDD containing case classes |
| to a DataFrame. The case class |
| defines the schema of the table. The names of the arguments to the case class are read using |
| reflection and become the names of the columns. Case classes can also be nested or contain complex |
| types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be |
| registered as a table. Tables can be used in subsequent SQL statements.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| <span class="c1">// this is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span> |
| |
| <span class="c1">// Define the schema using a case class.</span> |
| <span class="c1">// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,</span> |
| <span class="c1">// you can use custom classes that implement the Product interface.</span> |
| <span class="k">case</span> <span class="k">class</span> <span class="nc">Person</span><span class="o">(</span><span class="n">name</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">age</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> |
| |
| <span class="c1">// Create an RDD of Person objects and register it as a table.</span> |
| <span class="k">val</span> <span class="n">people</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)).</span><span class="n">map</span><span class="o">(</span><span class="n">p</span> <span class="k">=></span> <span class="nc">Person</span><span class="o">(</span><span class="n">p</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">p</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">.</span><span class="n">toInt</span><span class="o">)).</span><span class="n">toDF</span><span class="o">()</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="k">val</span> <span class="n">teenagers</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name, age FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by field index:</span> |
| <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span> |
| |
| <span class="c1">// or by field name:</span> |
| <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">.</span><span class="n">getAs</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="s">"name"</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span> |
| |
| <span class="c1">// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]</span> |
| <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">getValuesMap</span><span class="o">[</span><span class="kt">Any</span><span class="o">](</span><span class="nc">List</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">))).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span> |
| <span class="c1">// Map("name" -> "Justin", "age" -> 19)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>Spark SQL supports automatically converting an RDD of <a href="http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly">JavaBeans</a> |
| into a DataFrame. The BeanInfo, obtained using reflection, defines the schema of the table. |
| Currently, Spark SQL does not support JavaBeans that contain |
| nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a |
| class that implements Serializable and has getters and setters for all of its fields.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Person</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kt">int</span> <span class="n">age</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">getName</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">name</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setName</span><span class="o">(</span><span class="n">String</span> <span class="n">name</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">name</span> <span class="o">=</span> <span class="n">name</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">int</span> <span class="nf">getAge</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">age</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setAge</span><span class="o">(</span><span class="kt">int</span> <span class="n">age</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">this</span><span class="o">.</span><span class="na">age</span> <span class="o">=</span> <span class="n">age</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>A schema can be applied to an existing RDD by calling <code>createDataFrame</code> and providing the Class object |
| for the JavaBean.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="c1">// Load a text file and convert each line to a JavaBean.</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Person</span><span class="o">></span> <span class="n">people</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">).</span><span class="na">map</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Person</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">Person</span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">parts</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> |
| |
| <span class="n">Person</span> <span class="n">person</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Person</span><span class="o">();</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setName</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">0</span><span class="o">]);</span> |
| <span class="n">person</span><span class="o">.</span><span class="na">setAge</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">parts</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">()));</span> |
| |
| <span class="k">return</span> <span class="n">person</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Apply a schema to an RDD of JavaBeans and register it as a table.</span> |
| <span class="n">DataFrame</span> <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">people</span><span class="o">,</span> <span class="n">Person</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL can be run over RDDs that have been registered as tables.</span> |
| <span class="n">DataFrame</span> <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by ordinal.</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">teenagerNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="na">javaRDD</span><span class="o">().</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">call</span><span class="o">(</span><span class="n">Row</span> <span class="n">row</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of |
| key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, |
| and the types are inferred by looking at the first row. Since we currently only look at the first |
| row, it is important that there is no missing data in the first row of the RDD. In future versions we |
| plan to more completely infer the schema by looking at more data, similar to the inference that is |
| performed on JSON files.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sc is an existing SparkContext.</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span><span class="p">,</span> <span class="n">Row</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># Load a text file and convert each line to a Row.</span> |
| <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="p">)</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">","</span><span class="p">))</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">age</span><span class="o">=</span><span class="nb">int</span><span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">])))</span> |
| |
| <span class="c"># Infer the schema, and register the DataFrame as a table.</span> |
| <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">)</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c"># SQL can be run over DataFrames that have been registered as a table.</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| |
| <span class="c"># The results of SQL queries are RDDs and support all the normal RDD operations.</span> |
| <span class="n">teenNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">teenName</span> <span class="ow">in</span> <span class="n">teenNames</span><span class="o">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span><span class="p">(</span><span class="n">teenName</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="programmatically-specifying-the-schema">Programmatically Specifying the Schema</h3> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>When case classes cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed |
| and fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of <code>Row</code>s from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| <code>Row</code>s in the RDD created in Step 1.</li> |
| <li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided |
| by <code>SQLContext</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// Create an RDD</span> |
| <span class="k">val</span> <span class="n">people</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">)</span> |
| |
| <span class="c1">// The schema is encoded in a string</span> |
| <span class="k">val</span> <span class="n">schemaString</span> <span class="k">=</span> <span class="s">"name age"</span> |
| |
| <span class="c1">// Import Row.</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| |
| <span class="c1">// Import Spark SQL data types</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.sql.types.</span><span class="o">{</span><span class="nc">StructType</span><span class="o">,</span><span class="nc">StructField</span><span class="o">,</span><span class="nc">StringType</span><span class="o">};</span> |
| |
| <span class="c1">// Generate the schema based on the string of schema</span> |
| <span class="k">val</span> <span class="n">schema</span> <span class="k">=</span> |
| <span class="nc">StructType</span><span class="o">(</span> |
| <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">fieldName</span> <span class="k">=></span> <span class="nc">StructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">,</span> <span class="kc">true</span><span class="o">)))</span> |
| |
| <span class="c1">// Convert records of the RDD (people) to Rows.</span> |
| <span class="k">val</span> <span class="n">rowRDD</span> <span class="k">=</span> <span class="n">people</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)).</span><span class="n">map</span><span class="o">(</span><span class="n">p</span> <span class="k">=></span> <span class="nc">Row</span><span class="o">(</span><span class="n">p</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">p</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">trim</span><span class="o">))</span> |
| |
| <span class="c1">// Apply the schema to the RDD.</span> |
| <span class="k">val</span> <span class="n">peopleDataFrame</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">)</span> |
| |
| <span class="c1">// Register the DataFrames as a table.</span> |
| <span class="n">peopleDataFrame</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="k">val</span> <span class="n">results</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people"</span><span class="o">)</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by field index or by field name.</span> |
| <span class="n">results</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>When JavaBean classes cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed and |
| fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of <code>Row</code>s from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| <code>Row</code>s in the RDD created in Step 1.</li> |
| <li>Apply the schema to the RDD of <code>Row</code>s via <code>createDataFrame</code> method provided |
| by <code>SQLContext</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.Function</span><span class="o">;</span> |
| <span class="c1">// Import factory methods provided by DataTypes.</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.DataTypes</span><span class="o">;</span> |
| <span class="c1">// Import StructType and StructField</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructType</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.types.StructField</span><span class="o">;</span> |
| <span class="c1">// Import Row.</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span> |
| <span class="c1">// Import RowFactory.</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.sql.RowFactory</span><span class="o">;</span> |
| |
| <span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="c1">// Load a text file and convert each line to a JavaBean.</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">people</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="o">);</span> |
| |
| <span class="c1">// The schema is encoded in a string</span> |
| <span class="n">String</span> <span class="n">schemaString</span> <span class="o">=</span> <span class="s">"name age"</span><span class="o">;</span> |
| |
| <span class="c1">// Generate the schema based on the string of schema</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">StructField</span><span class="o">></span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o"><</span><span class="n">StructField</span><span class="o">>();</span> |
| <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="nl">fieldName:</span> <span class="n">schemaString</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> <span class="o">{</span> |
| <span class="n">fields</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructField</span><span class="o">(</span><span class="n">fieldName</span><span class="o">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">StringType</span><span class="o">,</span> <span class="kc">true</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="n">StructType</span> <span class="n">schema</span> <span class="o">=</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">createStructType</span><span class="o">(</span><span class="n">fields</span><span class="o">);</span> |
| |
| <span class="c1">// Convert records of the RDD (people) to Rows.</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">rowRDD</span> <span class="o">=</span> <span class="n">people</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Row</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">Row</span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">record</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="n">String</span><span class="o">[]</span> <span class="n">fields</span> <span class="o">=</span> <span class="n">record</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">","</span><span class="o">);</span> |
| <span class="k">return</span> <span class="n">RowFactory</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">fields</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span> <span class="n">fields</span><span class="o">[</span><span class="mi">1</span><span class="o">].</span><span class="na">trim</span><span class="o">());</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| |
| <span class="c1">// Apply the schema to the RDD.</span> |
| <span class="n">DataFrame</span> <span class="n">peopleDataFrame</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">rowRDD</span><span class="o">,</span> <span class="n">schema</span><span class="o">);</span> |
| |
| <span class="c1">// Register the DataFrame as a table.</span> |
| <span class="n">peopleDataFrame</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL can be run over RDDs that have been registered as tables.</span> |
| <span class="n">DataFrame</span> <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people"</span><span class="o">);</span> |
| |
| <span class="c1">// The results of SQL queries are DataFrames and support all the normal RDD operations.</span> |
| <span class="c1">// The columns of a row in the result can be accessed by ordinal.</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">names</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">javaRDD</span><span class="o">().</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">call</span><span class="o">(</span><span class="n">Row</span> <span class="n">row</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>When a dictionary of kwargs cannot be defined ahead of time (for example, |
| the structure of records is encoded in a string, or a text dataset will be parsed and |
| fields will be projected differently for different users), |
| a <code>DataFrame</code> can be created programmatically with three steps.</p> |
| |
| <ol> |
| <li>Create an RDD of tuples or lists from the original RDD;</li> |
| <li>Create the schema represented by a <code>StructType</code> matching the structure of |
| tuples or lists in the RDD created in the step 1.</li> |
| <li>Apply the schema to the RDD via <code>createDataFrame</code> method provided by <code>SQLContext</code>.</li> |
| </ol> |
| |
| <p>For example:</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># Import SQLContext and data types</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="o">*</span> |
| |
| <span class="c"># sc is an existing SparkContext.</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># Load a text file and convert each line to a tuple.</span> |
| <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.txt"</span><span class="p">)</span> |
| <span class="n">parts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">l</span><span class="p">:</span> <span class="n">l</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">","</span><span class="p">))</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">parts</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="p">(</span><span class="n">p</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span> <span class="n">p</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">strip</span><span class="p">()))</span> |
| |
| <span class="c"># The schema is encoded in a string.</span> |
| <span class="n">schemaString</span> <span class="o">=</span> <span class="s">"name age"</span> |
| |
| <span class="n">fields</span> <span class="o">=</span> <span class="p">[</span><span class="n">StructField</span><span class="p">(</span><span class="n">field_name</span><span class="p">,</span> <span class="n">StringType</span><span class="p">(),</span> <span class="bp">True</span><span class="p">)</span> <span class="k">for</span> <span class="n">field_name</span> <span class="ow">in</span> <span class="n">schemaString</span><span class="o">.</span><span class="n">split</span><span class="p">()]</span> |
| <span class="n">schema</span> <span class="o">=</span> <span class="n">StructType</span><span class="p">(</span><span class="n">fields</span><span class="p">)</span> |
| |
| <span class="c"># Apply the schema to the RDD.</span> |
| <span class="n">schemaPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">people</span><span class="p">,</span> <span class="n">schema</span><span class="p">)</span> |
| |
| <span class="c"># Register the DataFrame as a table.</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c"># SQL can be run over DataFrames that have been registered as a table.</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM people"</span><span class="p">)</span> |
| |
| <span class="c"># The results of SQL queries are RDDs and support all the normal RDD operations.</span> |
| <span class="n">names</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">names</span><span class="o">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span><span class="p">(</span><span class="n">name</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h1 id="data-sources">Data Sources</h1> |
| |
| <p>Spark SQL supports operating on a variety of data sources through the <code>DataFrame</code> interface. |
| A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. |
| Registering a DataFrame as a table allows you to run SQL queries over its data. This section |
| describes the general methods for loading and saving data using the Spark Data Sources and then |
| goes into specific options that are available for the built-in data sources.</p> |
| |
| <h2 id="generic-loadsave-functions">Generic Load/Save Functions</h2> |
| |
| <p>In the simplest form, the default data source (<code>parquet</code> unless otherwise configured by |
| <code>spark.sql.sources.default</code>) will be used for all operations.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">load</span><span class="o">(</span><span class="s">"examples/src/main/resources/users.parquet"</span><span class="o">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"favorite_color"</span><span class="o">).</span><span class="n">write</span><span class="o">.</span><span class="n">save</span><span class="o">(</span><span class="s">"namesAndFavColors.parquet"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">load</span><span class="o">(</span><span class="s">"examples/src/main/resources/users.parquet"</span><span class="o">);</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"favorite_color"</span><span class="o">).</span><span class="na">write</span><span class="o">().</span><span class="na">save</span><span class="o">(</span><span class="s">"namesAndFavColors.parquet"</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s">"examples/src/main/resources/users.parquet"</span><span class="p">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">,</span> <span class="s">"favorite_color"</span><span class="p">)</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">save</span><span class="p">(</span><span class="s">"namesAndFavColors.parquet"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">df <span class="o"><-</span> loadDF<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"people.parquet"</span><span class="p">)</span> |
| saveDF<span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> <span class="s">"name"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">),</span> <span class="s">"namesAndAges.parquet"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h3 id="manually-specifying-options">Manually Specifying Options</h3> |
| |
| <p>You can also manually specify the data source that will be used along with any extra options |
| that you would like to pass to the data source. Data sources are specified by their fully qualified |
| name (i.e., <code>org.apache.spark.sql.parquet</code>), but for built-in sources you can also use their short |
| names (<code>json</code>, <code>parquet</code>, <code>jdbc</code>). DataFrames of any type can be converted into other types |
| using this syntax.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"json"</span><span class="o">).</span><span class="n">load</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">).</span><span class="n">write</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"parquet"</span><span class="o">).</span><span class="n">save</span><span class="o">(</span><span class="s">"namesAndAges.parquet"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataFrame</span> <span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">format</span><span class="o">(</span><span class="s">"json"</span><span class="o">).</span><span class="na">load</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"age"</span><span class="o">).</span><span class="na">write</span><span class="o">().</span><span class="na">format</span><span class="o">(</span><span class="s">"parquet"</span><span class="o">).</span><span class="na">save</span><span class="o">(</span><span class="s">"namesAndAges.parquet"</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">,</span> <span class="n">format</span><span class="o">=</span><span class="s">"json"</span><span class="p">)</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s">"name"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">)</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">save</span><span class="p">(</span><span class="s">"namesAndAges.parquet"</span><span class="p">,</span> <span class="n">format</span><span class="o">=</span><span class="s">"parquet"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">df <span class="o"><-</span> loadDF<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"people.json"</span><span class="p">,</span> <span class="s">"json"</span><span class="p">)</span> |
| saveDF<span class="p">(</span>select<span class="p">(</span>df<span class="p">,</span> <span class="s">"name"</span><span class="p">,</span> <span class="s">"age"</span><span class="p">),</span> <span class="s">"namesAndAges.parquet"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h3 id="save-modes">Save Modes</h3> |
| |
| <p>Save operations can optionally take a <code>SaveMode</code>, that specifies how to handle existing data if |
| present. It is important to realize that these save modes do not utilize any locking and are not |
| atomic. Additionally, when performing a <code>Overwrite</code>, the data will be deleted before writing out the |
| new data.</p> |
| |
| <table class="table"> |
| <tr><th>Scala/Java</th><th>Any Language</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>SaveMode.ErrorIfExists</code> (default)</td> |
| <td><code>"error"</code> (default)</td> |
| <td> |
| When saving a DataFrame to a data source, if data already exists, |
| an exception is expected to be thrown. |
| </td> |
| </tr> |
| <tr> |
| <td><code>SaveMode.Append</code></td> |
| <td><code>"append"</code></td> |
| <td> |
| When saving a DataFrame to a data source, if data/table already exists, |
| contents of the DataFrame are expected to be appended to existing data. |
| </td> |
| </tr> |
| <tr> |
| <td><code>SaveMode.Overwrite</code></td> |
| <td><code>"overwrite"</code></td> |
| <td> |
| Overwrite mode means that when saving a DataFrame to a data source, |
| if data/table already exists, existing data is expected to be overwritten by the contents of |
| the DataFrame. |
| </td> |
| </tr> |
| <tr> |
| <td><code>SaveMode.Ignore</code></td> |
| <td><code>"ignore"</code></td> |
| <td> |
| Ignore mode means that when saving a DataFrame to a data source, if data already exists, |
| the save operation is expected to not save the contents of the DataFrame and to not |
| change the existing data. This is similar to a <code>CREATE TABLE IF NOT EXISTS</code> in SQL. |
| </td> |
| </tr> |
| </table> |
| |
| <h3 id="saving-to-persistent-tables">Saving to Persistent Tables</h3> |
| |
| <p>When working with a <code>HiveContext</code>, <code>DataFrames</code> can also be saved as persistent tables using the |
| <code>saveAsTable</code> command. Unlike the <code>registerTempTable</code> command, <code>saveAsTable</code> will materialize the |
| contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables |
| will still exist even after your Spark program has restarted, as long as you maintain your connection |
| to the same metastore. A DataFrame for a persistent table can be created by calling the <code>table</code> |
| method on a <code>SQLContext</code> with the name of the table.</p> |
| |
| <p>By default <code>saveAsTable</code> will create a “managed table”, meaning that the location of the data will |
| be controlled by the metastore. Managed tables will also have their data deleted automatically |
| when a table is dropped.</p> |
| |
| <h2 id="parquet-files">Parquet Files</h2> |
| |
| <p><a href="http://parquet.io">Parquet</a> is a columnar format that is supported by many other data processing systems. |
| Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema |
| of the original data.</p> |
| |
| <h3 id="loading-data-programmatically">Loading Data Programmatically</h3> |
| |
| <p>Using the data from the above example:</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sqlContext from the previous example is used in this example.</span> |
| <span class="c1">// This is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span> |
| |
| <span class="k">val</span> <span class="n">people</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Person</span><span class="o">]</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// An RDD of case class objects, from the previous example.</span> |
| |
| <span class="c1">// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">)</span> |
| |
| <span class="c1">// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c1">// The result of loading a Parquet file is also a DataFrame.</span> |
| <span class="k">val</span> <span class="n">parquetFile</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">)</span> |
| |
| <span class="c1">//Parquet files can also be registered as tables and then used in SQL statements.</span> |
| <span class="n">parquetFile</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"parquetFile"</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">teenagers</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sqlContext from the previous example is used in this example.</span> |
| |
| <span class="n">DataFrame</span> <span class="n">schemaPeople</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// The DataFrame from the previous example.</span> |
| |
| <span class="c1">// DataFrames can be saved as Parquet files, maintaining the schema information.</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">parquet</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">);</span> |
| |
| <span class="c1">// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c1">// The result of loading a parquet file is also a DataFrame.</span> |
| <span class="n">DataFrame</span> <span class="n">parquetFile</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">parquet</span><span class="o">(</span><span class="s">"people.parquet"</span><span class="o">);</span> |
| |
| <span class="c1">// Parquet files can also be registered as tables and then used in SQL statements.</span> |
| <span class="n">parquetFile</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"parquetFile"</span><span class="o">);</span> |
| <span class="n">DataFrame</span> <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="o">);</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">teenagerNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="na">javaRDD</span><span class="o">().</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="nf">call</span><span class="o">(</span><span class="n">Row</span> <span class="n">row</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sqlContext from the previous example is used in this example.</span> |
| |
| <span class="n">schemaPeople</span> <span class="c"># The DataFrame from the previous example.</span> |
| |
| <span class="c"># DataFrames can be saved as Parquet files, maintaining the schema information.</span> |
| <span class="n">schemaPeople</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s">"people.parquet"</span><span class="p">)</span> |
| |
| <span class="c"># Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c"># The result of loading a parquet file is also a DataFrame.</span> |
| <span class="n">parquetFile</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s">"people.parquet"</span><span class="p">)</span> |
| |
| <span class="c"># Parquet files can also be registered as tables and then used in SQL statements.</span> |
| <span class="n">parquetFile</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"parquetFile"</span><span class="p">);</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| <span class="n">teenNames</span> <span class="o">=</span> <span class="n">teenagers</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">p</span><span class="p">:</span> <span class="s">"Name: "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">teenName</span> <span class="ow">in</span> <span class="n">teenNames</span><span class="o">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span><span class="p">(</span><span class="n">teenName</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r"><span class="c1"># sqlContext from the previous example is used in this example.</span> |
| |
| schemaPeople <span class="c1"># The DataFrame from the previous example.</span> |
| |
| <span class="c1"># DataFrames can be saved as Parquet files, maintaining the schema information.</span> |
| saveAsParquetFile<span class="p">(</span>schemaPeople<span class="p">,</span> <span class="s">"people.parquet"</span><span class="p">)</span> |
| |
| <span class="c1"># Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.</span> |
| <span class="c1"># The result of loading a parquet file is also a DataFrame.</span> |
| parquetFile <span class="o"><-</span> parquetFile<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"people.parquet"</span><span class="p">)</span> |
| |
| <span class="c1"># Parquet files can also be registered as tables and then used in SQL statements.</span> |
| registerTempTable<span class="p">(</span>parquetFile<span class="p">,</span> <span class="s">"parquetFile"</span><span class="p">);</span> |
| teenagers <span class="o"><-</span> sql<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| teenNames <span class="o"><-</span> map<span class="p">(</span>teenagers<span class="p">,</span> <span class="kr">function</span><span class="p">(</span>p<span class="p">)</span> <span class="p">{</span> <span class="kp">paste</span><span class="p">(</span><span class="s">"Name:"</span><span class="p">,</span> p<span class="o">$</span>name<span class="p">)})</span> |
| <span class="kr">for</span> <span class="p">(</span>teenName <span class="kr">in</span> collect<span class="p">(</span>teenNames<span class="p">))</span> <span class="p">{</span> |
| <span class="kp">cat</span><span class="p">(</span>teenName<span class="p">,</span> <span class="s">"\n"</span><span class="p">)</span> |
| <span class="p">}</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sqlContext is an existing HiveContext</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"REFRESH TABLE my_table"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">TABLE</span> <span class="n">parquetTable</span> |
| <span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">parquet</span> |
| <span class="k">OPTIONS</span> <span class="p">(</span> |
| <span class="n">path</span> <span class="ss">"examples/src/main/resources/people.parquet"</span> |
| <span class="p">)</span> |
| |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">parquetTable</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="partition-discovery">Partition Discovery</h3> |
| |
| <p>Table partitioning is a common optimization approach used in systems like Hive. In a partitioned |
| table, data are usually stored in different directories, with partitioning column values encoded in |
| the path of each partition directory. The Parquet data source is now able to discover and infer |
| partitioning information automatically. For example, we can store all our previously used |
| population data into a partitioned table using the following directory structure, with two extra |
| columns, <code>gender</code> and <code>country</code> as partitioning columns:</p> |
| |
| <div class="highlight"><pre><code class="language-text" data-lang="text">path |
| └── to |
| └── table |
| ├── gender=male |
| │  ├── ... |
| │  │ |
| │  ├── country=US |
| │  │  └── data.parquet |
| │  ├── country=CN |
| │  │  └── data.parquet |
| │  └── ... |
| └── gender=female |
|   ├── ... |
|   │ |
|   ├── country=US |
|   │  └── data.parquet |
|   ├── country=CN |
|   │  └── data.parquet |
|   └── ...</code></pre></div> |
| |
| <p>By passing <code>path/to/table</code> to either <code>SQLContext.read.parquet</code> or <code>SQLContext.read.load</code>, Spark SQL |
| will automatically extract the partitioning information from the paths. |
| Now the schema of the returned DataFrame becomes:</p> |
| |
| <div class="highlight"><pre><code class="language-text" data-lang="text">root |
| |-- name: string (nullable = true) |
| |-- age: long (nullable = true) |
| |-- gender: string (nullable = true) |
| |-- country: string (nullable = true)</code></pre></div> |
| |
| <p>Notice that the data types of the partitioning columns are automatically inferred. Currently, |
| numeric data types and string type are supported. Sometimes users may not want to automatically |
| infer the data types of the partitioning columns. For these use cases, the automatic type inference |
| can be configured by <code>spark.sql.sources.partitionColumnTypeInference.enabled</code>, which is default to |
| <code>true</code>. When type inference is disabled, string type will be used for the partitioning columns.</p> |
| |
| <h3 id="schema-merging">Schema Merging</h3> |
| |
| <p>Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with |
| a simple schema, and gradually add more columns to the schema as needed. In this way, users may end |
| up with multiple Parquet files with different but mutually compatible schemas. The Parquet data |
| source is now able to automatically detect this case and merge schemas of all these files.</p> |
| |
| <p>Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we |
| turned it off by default starting from 1.5.0. You may enable it by</p> |
| |
| <ol> |
| <li>setting data source option <code>mergeSchema</code> to <code>true</code> when reading Parquet files (as shown in the |
| examples below), or</li> |
| <li>setting the global SQL option <code>spark.sql.parquet.mergeSchema</code> to <code>true</code>.</li> |
| </ol> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sqlContext from the previous example is used in this example.</span> |
| <span class="c1">// This is used to implicitly convert an RDD to a DataFrame.</span> |
| <span class="k">import</span> <span class="nn">sqlContext.implicits._</span> |
| |
| <span class="c1">// Create a simple DataFrame, stored into a partition directory</span> |
| <span class="k">val</span> <span class="n">df1</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">makeRDD</span><span class="o">(</span><span class="mi">1</span> <span class="n">to</span> <span class="mi">5</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">i</span> <span class="k">=></span> <span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">i</span> <span class="o">*</span> <span class="mi">2</span><span class="o">)).</span><span class="n">toDF</span><span class="o">(</span><span class="s">"single"</span><span class="o">,</span> <span class="s">"double"</span><span class="o">)</span> |
| <span class="n">df1</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">"data/test_table/key=1"</span><span class="o">)</span> |
| |
| <span class="c1">// Create another DataFrame in a new partition directory,</span> |
| <span class="c1">// adding a new column and dropping an existing column</span> |
| <span class="k">val</span> <span class="n">df2</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">makeRDD</span><span class="o">(</span><span class="mi">6</span> <span class="n">to</span> <span class="mi">10</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">i</span> <span class="k">=></span> <span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">i</span> <span class="o">*</span> <span class="mi">3</span><span class="o">)).</span><span class="n">toDF</span><span class="o">(</span><span class="s">"single"</span><span class="o">,</span> <span class="s">"triple"</span><span class="o">)</span> |
| <span class="n">df2</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">"data/test_table/key=2"</span><span class="o">)</span> |
| |
| <span class="c1">// Read the partitioned table</span> |
| <span class="k">val</span> <span class="n">df3</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"mergeSchema"</span><span class="o">,</span> <span class="s">"true"</span><span class="o">).</span><span class="n">parquet</span><span class="o">(</span><span class="s">"data/test_table"</span><span class="o">)</span> |
| <span class="n">df3</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| |
| <span class="c1">// The final schema consists of all 3 columns in the Parquet files together</span> |
| <span class="c1">// with the partitioning column appeared in the partition directory paths.</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- single: int (nullable = true)</span> |
| <span class="c1">// |-- double: int (nullable = true)</span> |
| <span class="c1">// |-- triple: int (nullable = true)</span> |
| <span class="c1">// |-- key : int (nullable = true)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sqlContext from the previous example is used in this example.</span> |
| |
| <span class="c"># Create a simple DataFrame, stored into a partition directory</span> |
| <span class="n">df1</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">6</span><span class="p">))</span>\ |
| <span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">single</span><span class="o">=</span><span class="n">i</span><span class="p">,</span> <span class="n">double</span><span class="o">=</span><span class="n">i</span> <span class="o">*</span> <span class="mi">2</span><span class="p">)))</span> |
| <span class="n">df1</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s">"data/test_table/key=1"</span><span class="p">)</span> |
| |
| <span class="c"># Create another DataFrame in a new partition directory,</span> |
| <span class="c"># adding a new column and dropping an existing column</span> |
| <span class="n">df2</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">6</span><span class="p">,</span> <span class="mi">11</span><span class="p">))</span> |
| <span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">single</span><span class="o">=</span><span class="n">i</span><span class="p">,</span> <span class="n">triple</span><span class="o">=</span><span class="n">i</span> <span class="o">*</span> <span class="mi">3</span><span class="p">)))</span> |
| <span class="n">df2</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s">"data/test_table/key=2"</span><span class="p">)</span> |
| |
| <span class="c"># Read the partitioned table</span> |
| <span class="n">df3</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s">"mergeSchema"</span><span class="p">,</span> <span class="s">"true"</span><span class="p">)</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s">"data/test_table"</span><span class="p">)</span> |
| <span class="n">df3</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| |
| <span class="c"># The final schema consists of all 3 columns in the Parquet files together</span> |
| <span class="c"># with the partitioning column appeared in the partition directory paths.</span> |
| <span class="c"># root</span> |
| <span class="c"># |-- single: int (nullable = true)</span> |
| <span class="c"># |-- double: int (nullable = true)</span> |
| <span class="c"># |-- triple: int (nullable = true)</span> |
| <span class="c"># |-- key : int (nullable = true)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r"><span class="c1"># sqlContext from the previous example is used in this example.</span> |
| |
| <span class="c1"># Create a simple DataFrame, stored into a partition directory</span> |
| saveDF<span class="p">(</span>df1<span class="p">,</span> <span class="s">"data/test_table/key=1"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">,</span> <span class="s">"overwrite"</span><span class="p">)</span> |
| |
| <span class="c1"># Create another DataFrame in a new partition directory,</span> |
| <span class="c1"># adding a new column and dropping an existing column</span> |
| saveDF<span class="p">(</span>df2<span class="p">,</span> <span class="s">"data/test_table/key=2"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">,</span> <span class="s">"overwrite"</span><span class="p">)</span> |
| |
| <span class="c1"># Read the partitioned table</span> |
| df3 <span class="o"><-</span> loadDF<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"data/test_table"</span><span class="p">,</span> <span class="s">"parquet"</span><span class="p">,</span> mergeSchema<span class="o">=</span><span class="s">"true"</span><span class="p">)</span> |
| printSchema<span class="p">(</span>df3<span class="p">)</span> |
| |
| <span class="c1"># The final schema consists of all 3 columns in the Parquet files together</span> |
| <span class="c1"># with the partitioning column appeared in the partition directory paths.</span> |
| <span class="c1"># root</span> |
| <span class="c1"># |-- single: int (nullable = true)</span> |
| <span class="c1"># |-- double: int (nullable = true)</span> |
| <span class="c1"># |-- triple: int (nullable = true)</span> |
| <span class="c1"># |-- key : int (nullable = true)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="hive-metastore-parquet-table-conversion">Hive metastore Parquet table conversion</h3> |
| |
| <p>When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own |
| Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the |
| <code>spark.sql.hive.convertMetastoreParquet</code> configuration, and is turned on by default.</p> |
| |
| <h4 id="hiveparquet-schema-reconciliation">Hive/Parquet Schema Reconciliation</h4> |
| |
| <p>There are two key differences between Hive and Parquet from the perspective of table schema |
| processing.</p> |
| |
| <ol> |
| <li>Hive is case insensitive, while Parquet is not</li> |
| <li>Hive considers all columns nullable, while nullability in Parquet is significant</li> |
| </ol> |
| |
| <p>Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a |
| Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:</p> |
| |
| <ol> |
| <li> |
| <p>Fields that have the same name in both schema must have the same data type regardless of |
| nullability. The reconciled field should have the data type of the Parquet side, so that |
| nullability is respected.</p> |
| </li> |
| <li> |
| <p>The reconciled schema contains exactly those fields defined in Hive metastore schema.</p> |
| |
| <ul> |
| <li>Any fields that only appear in the Parquet schema are dropped in the reconciled schema.</li> |
| <li>Any fileds that only appear in the Hive metastore schema are added as nullable field in the |
| reconciled schema.</li> |
| </ul> |
| </li> |
| </ol> |
| |
| <h4 id="metadata-refreshing">Metadata Refreshing</h4> |
| |
| <p>Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table |
| conversion is enabled, metadata of those converted tables are also cached. If these tables are |
| updated by Hive or other external tools, you need to refresh them manually to ensure consistent |
| metadata.</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sqlContext is an existing HiveContext</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">refreshTable</span><span class="o">(</span><span class="s">"my_table"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sqlContext is an existing HiveContext</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="na">refreshTable</span><span class="o">(</span><span class="s">"my_table"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sqlContext is an existing HiveContext</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">refreshTable</span><span class="p">(</span><span class="s">"my_table"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="n">REFRESH</span> <span class="k">TABLE</span> <span class="n">my_table</span><span class="p">;</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h3 id="configuration">Configuration</h3> |
| |
| <p>Configuration of Parquet can be done using the <code>setConf</code> method on <code>SQLContext</code> or by running |
| <code>SET key=value</code> commands using SQL.</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.parquet.binaryAsString</code></td> |
| <td>false</td> |
| <td> |
| Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do |
| not differentiate between binary data and strings when writing out the Parquet schema. This |
| flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.int96AsTimestamp</code></td> |
| <td>true</td> |
| <td> |
| Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This |
| flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.cacheMetadata</code></td> |
| <td>true</td> |
| <td> |
| Turns on caching of Parquet schema metadata. Can speed up querying of static data. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.compression.codec</code></td> |
| <td>gzip</td> |
| <td> |
| Sets the compression codec use when writing Parquet files. Acceptable values include: |
| uncompressed, snappy, gzip, lzo. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.filterPushdown</code></td> |
| <td>true</td> |
| <td>Enables Parquet filter push-down optimization when set to true.</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.hive.convertMetastoreParquet</code></td> |
| <td>true</td> |
| <td> |
| When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in |
| support. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.output.committer.class</code></td> |
| <td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td> |
| <td> |
| <p> |
| The output committer class used by Parquet. The specified class needs to be a subclass of |
| <code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a |
| subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>. |
| </p> |
| <p> |
| <b>Note:</b> |
| <ul> |
| <li> |
| This option is automatically ignored if <code>spark.speculation</code> is turned on. |
| </li> |
| <li> |
| This option must be set via Hadoop <code>Configuration</code> rather than Spark |
| <code>SQLConf</code>. |
| </li> |
| <li> |
| This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>. |
| </li> |
| </ul> |
| </p> |
| <p> |
| Spark SQL comes with a builtin |
| <code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more |
| efficient then the default Parquet output committer when writing data to S3. |
| </p> |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.mergeSchema</code></td> |
| <td><code>false</code></td> |
| <td> |
| <p> |
| When true, the Parquet data source merges schemas collected from all data files, otherwise the |
| schema is picked from the summary file or a random data file if no summary file is available. |
| </p> |
| </td> |
| </tr> |
| </table> |
| |
| <h2 id="json-datasets">JSON Datasets</h2> |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. |
| This conversion can be done using <code>SQLContext.read.json()</code> on either an RDD of String, |
| or a JSON file.</p> |
| |
| <p>Note that the file that is offered as <em>a json file</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="nc">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="c1">// A JSON dataset is pointed to by path.</span> |
| <span class="c1">// The path can be either a single text file or a directory storing text files.</span> |
| <span class="k">val</span> <span class="n">path</span> <span class="k">=</span> <span class="s">"examples/src/main/resources/people.json"</span> |
| <span class="k">val</span> <span class="n">people</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="n">path</span><span class="o">)</span> |
| |
| <span class="c1">// The inferred schema can be visualized using the printSchema() method.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: integer (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Register this DataFrame as a table.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">)</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="k">val</span> <span class="n">teenagers</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">)</span> |
| |
| <span class="c1">// Alternatively, a DataFrame can be created for a JSON dataset represented by</span> |
| <span class="c1">// an RDD[String] storing one JSON object per string.</span> |
| <span class="k">val</span> <span class="n">anotherPeopleRDD</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span> |
| <span class="s">"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"""</span> <span class="o">::</span> <span class="nc">Nil</span><span class="o">)</span> |
| <span class="k">val</span> <span class="n">anotherPeople</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="n">anotherPeopleRDD</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. |
| This conversion can be done using <code>SQLContext.read().json()</code> on either an RDD of String, |
| or a JSON file.</p> |
| |
| <p>Note that the file that is offered as <em>a json file</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">SQLContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">SQLContext</span><span class="o">(</span><span class="n">sc</span><span class="o">);</span> |
| |
| <span class="c1">// A JSON dataset is pointed to by path.</span> |
| <span class="c1">// The path can be either a single text file or a directory storing text files.</span> |
| <span class="n">DataFrame</span> <span class="n">people</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="o">);</span> |
| |
| <span class="c1">// The inferred schema can be visualized using the printSchema() method.</span> |
| <span class="n">people</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span> |
| <span class="c1">// root</span> |
| <span class="c1">// |-- age: integer (nullable = true)</span> |
| <span class="c1">// |-- name: string (nullable = true)</span> |
| |
| <span class="c1">// Register this DataFrame as a table.</span> |
| <span class="n">people</span><span class="o">.</span><span class="na">registerTempTable</span><span class="o">(</span><span class="s">"people"</span><span class="o">);</span> |
| |
| <span class="c1">// SQL statements can be run by using the sql methods provided by sqlContext.</span> |
| <span class="n">DataFrame</span> <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="o">);</span> |
| |
| <span class="c1">// Alternatively, a DataFrame can be created for a JSON dataset represented by</span> |
| <span class="c1">// an RDD[String] storing one JSON object per string.</span> |
| <span class="n">List</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">jsonData</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span> |
| <span class="s">"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"</span><span class="o">);</span> |
| <span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">anotherPeopleRDD</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="n">jsonData</span><span class="o">);</span> |
| <span class="n">DataFrame</span> <span class="n">anotherPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="n">anotherPeopleRDD</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. |
| This conversion can be done using <code>SQLContext.read.json</code> on a JSON file.</p> |
| |
| <p>Note that the file that is offered as <em>a json file</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sc is an existing SparkContext.</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SQLContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">SQLContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="c"># A JSON dataset is pointed to by path.</span> |
| <span class="c"># The path can be either a single text file or a directory storing text files.</span> |
| <span class="n">people</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="s">"examples/src/main/resources/people.json"</span><span class="p">)</span> |
| |
| <span class="c"># The inferred schema can be visualized using the printSchema() method.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="c"># root</span> |
| <span class="c"># |-- age: integer (nullable = true)</span> |
| <span class="c"># |-- name: string (nullable = true)</span> |
| |
| <span class="c"># Register this DataFrame as a table.</span> |
| <span class="n">people</span><span class="o">.</span><span class="n">registerTempTable</span><span class="p">(</span><span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c"># SQL statements can be run by using the sql methods provided by `sqlContext`.</span> |
| <span class="n">teenagers</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="p">)</span> |
| |
| <span class="c"># Alternatively, a DataFrame can be created for a JSON dataset represented by</span> |
| <span class="c"># an RDD[String] storing one JSON object per string.</span> |
| <span class="n">anotherPeopleRDD</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">([</span> |
| <span class="s">'{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'</span><span class="p">])</span> |
| <span class="n">anotherPeople</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">jsonRDD</span><span class="p">(</span><span class="n">anotherPeopleRDD</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| <p>Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using |
| the <code>jsonFile</code> function, which loads data from a directory of JSON files where each line of the |
| files is a JSON object.</p> |
| |
| <p>Note that the file that is offered as <em>a json file</em> is not a typical JSON file. Each |
| line must contain a separate, self-contained valid JSON object. As a consequence, |
| a regular multi-line JSON file will most often fail.</p> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r"><span class="c1"># sc is an existing SparkContext.</span> |
| sqlContext <span class="o"><-</span> sparkRSQL.init<span class="p">(</span>sc<span class="p">)</span> |
| |
| <span class="c1"># A JSON dataset is pointed to by path.</span> |
| <span class="c1"># The path can be either a single text file or a directory storing text files.</span> |
| path <span class="o"><-</span> <span class="s">"examples/src/main/resources/people.json"</span> |
| <span class="c1"># Create a DataFrame from the file(s) pointed to by path</span> |
| people <span class="o"><-</span> jsonFile<span class="p">(</span>sqlContext<span class="p">,</span> path<span class="p">)</span> |
| |
| <span class="c1"># The inferred schema can be visualized using the printSchema() method.</span> |
| printSchema<span class="p">(</span>people<span class="p">)</span> |
| <span class="c1"># root</span> |
| <span class="c1"># |-- age: integer (nullable = true)</span> |
| <span class="c1"># |-- name: string (nullable = true)</span> |
| |
| <span class="c1"># Register this DataFrame as a table.</span> |
| registerTempTable<span class="p">(</span>people<span class="p">,</span> <span class="s">"people"</span><span class="p">)</span> |
| |
| <span class="c1"># SQL statements can be run by using the sql methods provided by `sqlContext`.</span> |
| teenagers <span class="o"><-</span> sql<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"SELECT name FROM people WHERE age >= 13 AND age <= 19"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">TABLE</span> <span class="n">jsonTable</span> |
| <span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">json</span> |
| <span class="k">OPTIONS</span> <span class="p">(</span> |
| <span class="n">path</span> <span class="ss">"examples/src/main/resources/people.json"</span> |
| <span class="p">)</span> |
| |
| <span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">jsonTable</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="hive-tables">Hive Tables</h2> |
| |
| <p>Spark SQL also supports reading and writing data stored in <a href="http://hive.apache.org/">Apache Hive</a>. |
| However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. |
| Hive support is enabled by adding the <code>-Phive</code> and <code>-Phive-thriftserver</code> flags to Spark’s build. |
| This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present |
| on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries |
| (SerDes) in order to access data stored in Hive.</p> |
| |
| <p>Configuration of Hive is done by placing your <code>hive-site.xml</code> file in <code>conf/</code>. Please note when running |
| the query on a YARN cluster (<code>yarn-cluster</code> mode), the <code>datanucleus</code> jars under the <code>lib_managed/jars</code> directory |
| and <code>hive-site.xml</code> under <code>conf/</code> directory need to be available on the driver and all executors launched by the |
| YARN cluster. The convenient way to do this is adding them through the <code>--jars</code> option and <code>--file</code> option of the |
| <code>spark-submit</code> command.</p> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL. Users who do |
| not have an existing Hive deployment can still create a <code>HiveContext</code>. When not configured by the |
| hive-site.xml, the context automatically creates <code>metastore_db</code> and <code>warehouse</code> in the current |
| directory.</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// sc is an existing SparkContext.</span> |
| <span class="k">val</span> <span class="n">sqlContext</span> <span class="k">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">.</span><span class="n">hive</span><span class="o">.</span><span class="nc">HiveContext</span><span class="o">(</span><span class="n">sc</span><span class="o">)</span> |
| |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="o">)</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="o">)</span> |
| |
| <span class="c1">// Queries are expressed in HiveQL</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">"FROM src SELECT key, value"</span><span class="o">).</span><span class="n">collect</span><span class="o">().</span><span class="n">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL. In addition to |
| the <code>sql</code> method a <code>HiveContext</code> also provides an <code>hql</code> method, which allows queries to be |
| expressed in HiveQL.</p> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// sc is an existing JavaSparkContext.</span> |
| <span class="n">HiveContext</span> <span class="n">sqlContext</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">hive</span><span class="o">.</span><span class="na">HiveContext</span><span class="o">(</span><span class="n">sc</span><span class="o">.</span><span class="na">sc</span><span class="o">);</span> |
| |
| <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="o">);</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="o">);</span> |
| |
| <span class="c1">// Queries are expressed in HiveQL.</span> |
| <span class="n">Row</span><span class="o">[]</span> <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">"FROM src SELECT key, value"</span><span class="o">).</span><span class="na">collect</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL.</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="c"># sc is an existing SparkContext.</span> |
| <span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">HiveContext</span> |
| <span class="n">sqlContext</span> <span class="o">=</span> <span class="n">HiveContext</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span> |
| |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="p">)</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="p">)</span> |
| |
| <span class="c"># Queries can be expressed in HiveQL.</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"FROM src SELECT key, value"</span><span class="p">)</span><span class="o">.</span><span class="n">collect</span><span class="p">()</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <p>When working with Hive one must construct a <code>HiveContext</code>, which inherits from <code>SQLContext</code>, and |
| adds support for finding tables in the MetaStore and writing queries using HiveQL.</p> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r"><span class="c1"># sc is an existing SparkContext.</span> |
| sqlContext <span class="o"><-</span> sparkRHive.init<span class="p">(</span>sc<span class="p">)</span> |
| |
| sql<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"</span><span class="p">)</span> |
| sql<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"</span><span class="p">)</span> |
| |
| <span class="c1"># Queries can be expressed in HiveQL.</span> |
| results <span class="o"><-</span> collect<span class="p">(</span>sql<span class="p">(</span>sqlContext<span class="p">,</span> <span class="s">"FROM src SELECT key, value"</span><span class="p">))</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h3 id="interacting-with-different-versions-of-hive-metastore">Interacting with Different Versions of Hive Metastore</h3> |
| |
| <p>One of the most important pieces of Spark SQL’s Hive support is interaction with Hive metastore, |
| which enables Spark SQL to access metadata of Hive tables. Starting from Spark 1.4.0, a single binary |
| build of Spark SQL can be used to query different versions of Hive metastores, using the configuration described below. |
| Note that independent of the version of Hive that is being used to talk to the metastore, internally Spark SQL |
| will compile against Hive 1.2.1 and use those classes for internal execution (serdes, UDFs, UDAFs, etc).</p> |
| |
| <p>The following options can be used to configure the version of Hive that is used to retrieve metadata:</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.hive.metastore.version</code></td> |
| <td><code>1.2.1</code></td> |
| <td> |
| Version of the Hive metastore. Available |
| options are <code>0.12.0</code> through <code>1.2.1</code>. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.hive.metastore.jars</code></td> |
| <td><code>builtin</code></td> |
| <td> |
| Location of the jars that should be used to instantiate the HiveMetastoreClient. This |
| property can be one of three options: |
| <ol> |
| <li><code>builtin</code></li> |
| Use Hive 1.2.1, which is bundled with the Spark assembly jar when <code>-Phive</code> is |
| enabled. When this option is chosen, <code>spark.sql.hive.metastore.version</code> must be |
| either <code>1.2.1</code> or not defined. |
| <li><code>maven</code></li> |
| Use Hive jars of specified version downloaded from Maven repositories. This configuration |
| is not generally recommended for production deployments. |
| <li>A classpath in the standard format for the JVM. This classpath must include all of Hive |
| and its dependencies, including the correct version of Hadoop. These jars only need to be |
| present on the driver, but if you are running in yarn cluster mode then you must ensure |
| they are packaged with you application.</li> |
| </ol> |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.hive.metastore.sharedPrefixes</code></td> |
| <td><code>com.mysql.jdbc,<br />org.postgresql,<br />com.microsoft.sqlserver,<br />oracle.jdbc</code></td> |
| <td> |
| <p> |
| A comma separated list of class prefixes that should be loaded using the classloader that is |
| shared between Spark SQL and a specific version of Hive. An example of classes that should |
| be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need |
| to be shared are those that interact with classes that are already shared. For example, |
| custom appenders that are used by log4j. |
| </p> |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.hive.metastore.barrierPrefixes</code></td> |
| <td><code>(empty)</code></td> |
| <td> |
| <p> |
| A comma separated list of class prefixes that should explicitly be reloaded for each version |
| of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a |
| prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>). |
| </p> |
| </td> |
| </tr> |
| </table> |
| |
| <h2 id="jdbc-to-other-databases">JDBC To Other Databases</h2> |
| |
| <p>Spark SQL also includes a data source that can read data from other databases using JDBC. This |
| functionality should be preferred over using <a href="api/scala/index.html#org.apache.spark.rdd.JdbcRDD">JdbcRDD</a>. |
| This is because the results are returned |
| as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. |
| The JDBC data source is also easier to use from Java or Python as it does not require the user to |
| provide a ClassTag. |
| (Note that this is different than the Spark SQL JDBC server, which allows other applications to |
| run queries using Spark SQL).</p> |
| |
| <p>To get started you will need to include the JDBC driver for you particular database on the |
| spark classpath. For example, to connect to postgres from the Spark Shell you would run the |
| following command:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">SPARK_CLASSPATH</span><span class="o">=</span>postgresql-9.3-1102-jdbc41.jar bin/spark-shell</code></pre></div> |
| |
| <p>Tables from the remote database can be loaded as a DataFrame or Spark SQL Temporary table using |
| the Data Sources API. The following options are supported:</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>url</code></td> |
| <td> |
| The JDBC URL to connect to. |
| </td> |
| </tr> |
| <tr> |
| <td><code>dbtable</code></td> |
| <td> |
| The JDBC table that should be read. Note that anything that is valid in a <code>FROM</code> clause of |
| a SQL query can be used. For example, instead of a full table you could also use a |
| subquery in parentheses. |
| </td> |
| </tr> |
| |
| <tr> |
| <td><code>driver</code></td> |
| <td> |
| The class name of the JDBC driver needed to connect to this URL. This class will be loaded |
| on the master and workers before running an JDBC commands to allow the driver to |
| register itself with the JDBC subsystem. |
| </td> |
| </tr> |
| <tr> |
| <td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td> |
| <td> |
| These options must all be specified if any of them is specified. They describe how to |
| partition the table when reading in parallel from multiple workers. |
| <code>partitionColumn</code> must be a numeric column from the table in question. Notice |
| that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the |
| partition stride, not for filtering the rows in table. So all rows in the table will be |
| partitioned and returned. |
| </td> |
| </tr> |
| </table> |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">jdbcDF</span> <span class="k">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"jdbc"</span><span class="o">).</span><span class="n">options</span><span class="o">(</span> |
| <span class="nc">Map</span><span class="o">(</span><span class="s">"url"</span> <span class="o">-></span> <span class="s">"jdbc:postgresql:dbserver"</span><span class="o">,</span> |
| <span class="s">"dbtable"</span> <span class="o">-></span> <span class="s">"schema.tablename"</span><span class="o">)).</span><span class="n">load</span><span class="o">()</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">></span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashMap</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>();</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"url"</span><span class="o">,</span> <span class="s">"jdbc:postgresql:dbserver"</span><span class="o">);</span> |
| <span class="n">options</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"dbtable"</span><span class="o">,</span> <span class="s">"schema.tablename"</span><span class="o">);</span> |
| |
| <span class="n">DataFrame</span> <span class="n">jdbcDF</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">format</span><span class="o">(</span><span class="s">"jdbc"</span><span class="o">).</span> <span class="n">options</span><span class="o">(</span><span class="n">options</span><span class="o">).</span><span class="na">load</span><span class="o">();</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">sqlContext</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s">'jdbc'</span><span class="p">)</span><span class="o">.</span><span class="n">options</span><span class="p">(</span><span class="n">url</span><span class="o">=</span><span class="s">'jdbc:postgresql:dbserver'</span><span class="p">,</span> <span class="n">dbtable</span><span class="o">=</span><span class="s">'schema.tablename'</span><span class="p">)</span><span class="o">.</span><span class="n">load</span><span class="p">()</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <div class="highlight"><pre><code class="language-r" data-lang="r">df <span class="o"><-</span> loadDF<span class="p">(</span>sqlContext<span class="p">,</span> <span class="kn">source</span><span class="o">=</span><span class="s">"jdbc"</span><span class="p">,</span> url<span class="o">=</span><span class="s">"jdbc:postgresql:dbserver"</span><span class="p">,</span> dbtable<span class="o">=</span><span class="s">"schema.tablename"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="sql"> |
| |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">TABLE</span> <span class="n">jdbcTable</span> |
| <span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">jdbc</span> |
| <span class="k">OPTIONS</span> <span class="p">(</span> |
| <span class="n">url</span> <span class="ss">"jdbc:postgresql:dbserver"</span><span class="p">,</span> |
| <span class="n">dbtable</span> <span class="ss">"schema.tablename"</span> |
| <span class="p">)</span></code></pre></div> |
| |
| </div> |
| </div> |
| |
| <h2 id="troubleshooting">Troubleshooting</h2> |
| |
| <ul> |
| <li>The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.</li> |
| <li>Some databases, such as H2, convert all names to upper case. You’ll need to use upper case to refer to those names in Spark SQL.</li> |
| </ul> |
| |
| <h1 id="performance-tuning">Performance Tuning</h1> |
| |
| <p>For some workloads it is possible to improve performance by either caching data in memory, or by |
| turning on some experimental options.</p> |
| |
| <h2 id="caching-data-in-memory">Caching Data In Memory</h2> |
| |
| <p>Spark SQL can cache tables using an in-memory columnar format by calling <code>sqlContext.cacheTable("tableName")</code> or <code>dataFrame.cache()</code>. |
| Then Spark SQL will scan only required columns and will automatically tune compression to minimize |
| memory usage and GC pressure. You can call <code>sqlContext.uncacheTable("tableName")</code> to remove the table from memory.</p> |
| |
| <p>Configuration of in-memory caching can be done using the <code>setConf</code> method on <code>SQLContext</code> or by running |
| <code>SET key=value</code> commands using SQL.</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td> |
| <td>true</td> |
| <td> |
| When set to true Spark SQL will automatically select a compression codec for each column based |
| on statistics of the data. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td> |
| <td>10000</td> |
| <td> |
| Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization |
| and compression, but risk OOMs when caching data. |
| </td> |
| </tr> |
| |
| </table> |
| |
| <h2 id="other-configuration-options">Other Configuration Options</h2> |
| |
| <p>The following options can also be used to tune the performance of query execution. It is possible |
| that these options will be deprecated in future release as more optimizations are performed automatically.</p> |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> |
| <tr> |
| <td><code>spark.sql.autoBroadcastJoinThreshold</code></td> |
| <td>10485760 (10 MB)</td> |
| <td> |
| Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when |
| performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently |
| statistics are only supported for Hive Metastore tables where the command |
| <code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.tungsten.enabled</code></td> |
| <td>true</td> |
| <td> |
| When true, use the optimized Tungsten physical execution backend which explicitly manages memory |
| and dynamically generates bytecode for expression evaluation. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.shuffle.partitions</code></td> |
| <td>200</td> |
| <td> |
| Configures the number of partitions to use when shuffling data for joins or aggregations. |
| </td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.planner.externalSort</code></td> |
| <td>true</td> |
| <td> |
| When true, performs sorts spilling to disk as needed otherwise sort each partition in memory. |
| </td> |
| </tr> |
| </table> |
| |
| <h1 id="distributed-sql-engine">Distributed SQL Engine</h1> |
| |
| <p>Spark SQL can also act as a distributed query engine using its JDBC/ODBC or command-line interface. |
| In this mode, end-users or applications can interact with Spark SQL directly to run SQL queries, |
| without the need to write any code.</p> |
| |
| <h2 id="running-the-thrift-jdbcodbc-server">Running the Thrift JDBC/ODBC server</h2> |
| |
| <p>The Thrift JDBC/ODBC server implemented here corresponds to the <a href="https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2"><code>HiveServer2</code></a> |
| in Hive 1.2.1 You can test the JDBC server with the beeline script that comes with either Spark or Hive 1.2.1.</p> |
| |
| <p>To start the JDBC/ODBC server, run the following in the Spark directory:</p> |
| |
| <pre><code>./sbin/start-thriftserver.sh |
| </code></pre> |
| |
| <p>This script accepts all <code>bin/spark-submit</code> command line options, plus a <code>--hiveconf</code> option to |
| specify Hive properties. You may run <code>./sbin/start-thriftserver.sh --help</code> for a complete list of |
| all available options. By default, the server listens on localhost:10000. You may override this |
| behaviour via either environment variables, i.e.:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nb">export </span><span class="nv">HIVE_SERVER2_THRIFT_PORT</span><span class="o">=</span><listening-port> |
| <span class="nb">export </span><span class="nv">HIVE_SERVER2_THRIFT_BIND_HOST</span><span class="o">=</span><listening-host> |
| ./sbin/start-thriftserver.sh <span class="se">\</span> |
| --master <master-uri> <span class="se">\</span> |
| ...</code></pre></div> |
| |
| <p>or system properties:</p> |
| |
| <div class="highlight"><pre><code class="language-bash" data-lang="bash">./sbin/start-thriftserver.sh <span class="se">\</span> |
| --hiveconf hive.server2.thrift.port<span class="o">=</span><listening-port> <span class="se">\</span> |
| --hiveconf hive.server2.thrift.bind.host<span class="o">=</span><listening-host> <span class="se">\</span> |
| --master <master-uri> |
| ...</code></pre></div> |
| |
| <p>Now you can use beeline to test the Thrift JDBC/ODBC server:</p> |
| |
| <pre><code>./bin/beeline |
| </code></pre> |
| |
| <p>Connect to the JDBC/ODBC server in beeline with:</p> |
| |
| <pre><code>beeline> !connect jdbc:hive2://localhost:10000 |
| </code></pre> |
| |
| <p>Beeline will ask you for a username and password. In non-secure mode, simply enter the username on |
| your machine and a blank password. For secure mode, please follow the instructions given in the |
| <a href="https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients">beeline documentation</a>.</p> |
| |
| <p>Configuration of Hive is done by placing your <code>hive-site.xml</code> file in <code>conf/</code>.</p> |
| |
| <p>You may also use the beeline script that comes with Hive.</p> |
| |
| <p>Thrift JDBC server also supports sending thrift RPC messages over HTTP transport. |
| Use the following setting to enable HTTP mode as system property or in <code>hive-site.xml</code> file in <code>conf/</code>:</p> |
| |
| <pre><code>hive.server2.transport.mode - Set this to value: http |
| hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 |
| hive.server2.http.endpoint - HTTP endpoint; default is cliservice |
| </code></pre> |
| |
| <p>To test, use beeline to connect to the JDBC/ODBC server in http mode with:</p> |
| |
| <pre><code>beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> |
| </code></pre> |
| |
| <h2 id="running-the-spark-sql-cli">Running the Spark SQL CLI</h2> |
| |
| <p>The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute |
| queries input from the command line. Note that the Spark SQL CLI cannot talk to the Thrift JDBC server.</p> |
| |
| <p>To start the Spark SQL CLI, run the following in the Spark directory:</p> |
| |
| <pre><code>./bin/spark-sql |
| </code></pre> |
| |
| <p>Configuration of Hive is done by placing your <code>hive-site.xml</code> file in <code>conf/</code>. |
| You may run <code>./bin/spark-sql --help</code> for a complete list of all available |
| options.</p> |
| |
| <h1 id="migration-guide">Migration Guide</h1> |
| |
| <h2 id="upgrading-from-spark-sql-14-to-15">Upgrading From Spark SQL 1.4 to 1.5</h2> |
| |
| <ul> |
| <li>Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with |
| code generation for expression evaluation. These features can both be disabled by setting |
| <code>spark.sql.tungsten.enabled</code> to `false.</li> |
| <li>Parquet schema merging is no longer enabled by default. It can be re-enabled by setting |
| <code>spark.sql.parquet.mergeSchema</code> to <code>true</code>.</li> |
| <li>Resolution of strings to columns in python now supports using dots (<code>.</code>) to qualify the column or |
| access nested values. For example <code>df['table.column.nestedField']</code>. However, this means that if |
| your column name contains any dots you must now escape them using backticks (e.g., <code>table.`column.with.dots`.nested</code>).</li> |
| <li>In-memory columnar storage partition pruning is on by default. It can be disabled by setting |
| <code>spark.sql.inMemoryColumnarStorage.partitionPruning</code> to <code>false</code>.</li> |
| <li>Unlimited precision decimal columns are no longer supported, instead Spark SQL enforces a maximum |
| precision of 38. When inferring schema from <code>BigDecimal</code> objects, a precision of (38, 18) is now |
| used. When no precision is specified in DDL then the default remains <code>Decimal(10, 0)</code>.</li> |
| <li>Timestamps are now stored at a precision of 1us, rather than 1ns</li> |
| <li>In the <code>sql</code> dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains |
| unchanged.</li> |
| <li>The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).</li> |
| <li>It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe |
| and thus this output committer will not be used when speculation is on, independent of configuration.</li> |
| <li>JSON data source will not automatically load new files that are created by other applications |
| (i.e. files that are not inserted to the dataset through Spark SQL). |
| For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore), |
| users can use <code>REFRESH TABLE</code> SQL command or <code>HiveContext</code>’s <code>refreshTable</code> method |
| to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate |
| the DataFrame and the new DataFrame will include new files.</li> |
| </ul> |
| |
| <h2 id="upgrading-from-spark-sql-13-to-14">Upgrading from Spark SQL 1.3 to 1.4</h2> |
| |
| <h4 id="dataframe-data-readerwriter-interface">DataFrame data reader/writer interface</h4> |
| |
| <p>Based on user feedback, we created a new, more fluid API for reading data in (<code>SQLContext.read</code>) |
| and writing data out (<code>DataFrame.write</code>), |
| and deprecated the old APIs (e.g. <code>SQLContext.parquetFile</code>, <code>SQLContext.jsonFile</code>).</p> |
| |
| <p>See the API docs for <code>SQLContext.read</code> ( |
| <a href="api/scala/index.html#org.apache.spark.sql.SQLContext@read:DataFrameReader">Scala</a>, |
| <a href="api/java/org/apache/spark/sql/SQLContext.html#read()">Java</a>, |
| <a href="api/python/pyspark.sql.html#pyspark.sql.SQLContext.read">Python</a> |
| ) and <code>DataFrame.write</code> ( |
| <a href="api/scala/index.html#org.apache.spark.sql.DataFrame@write:DataFrameWriter">Scala</a>, |
| <a href="api/java/org/apache/spark/sql/DataFrame.html#write()">Java</a>, |
| <a href="api/python/pyspark.sql.html#pyspark.sql.DataFrame.write">Python</a> |
| ) more information.</p> |
| |
| <h4 id="dataframegroupby-retains-grouping-columns">DataFrame.groupBy retains grouping columns</h4> |
| |
| <p>Based on user feedback, we changed the default behavior of <code>DataFrame.groupBy().agg()</code> to retain the |
| grouping columns in the resulting <code>DataFrame</code>. To keep the behavior in 1.3, set <code>spark.sql.retainGroupColumns</code> to <code>false</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// In 1.3.x, in order for the grouping column "department" to show up,</span> |
| <span class="c1">// it must be included explicitly as part of the agg function call.</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"department"</span><span class="o">).</span><span class="n">agg</span><span class="o">(</span><span class="n">$</span><span class="s">"department"</span><span class="o">,</span> <span class="n">max</span><span class="o">(</span><span class="s">"age"</span><span class="o">),</span> <span class="n">sum</span><span class="o">(</span><span class="s">"expense"</span><span class="o">))</span> |
| |
| <span class="c1">// In 1.4+, grouping column "department" is included automatically.</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"department"</span><span class="o">).</span><span class="n">agg</span><span class="o">(</span><span class="n">max</span><span class="o">(</span><span class="s">"age"</span><span class="o">),</span> <span class="n">sum</span><span class="o">(</span><span class="s">"expense"</span><span class="o">))</span> |
| |
| <span class="c1">// Revert to 1.3 behavior (not retaining grouping column) by:</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">setConf</span><span class="o">(</span><span class="s">"spark.sql.retainGroupColumns"</span><span class="o">,</span> <span class="s">"false"</span><span class="o">)</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// In 1.3.x, in order for the grouping column "department" to show up,</span> |
| <span class="c1">// it must be included explicitly as part of the agg function call.</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"department"</span><span class="o">).</span><span class="na">agg</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"department"</span><span class="o">),</span> <span class="n">max</span><span class="o">(</span><span class="s">"age"</span><span class="o">),</span> <span class="n">sum</span><span class="o">(</span><span class="s">"expense"</span><span class="o">));</span> |
| |
| <span class="c1">// In 1.4+, grouping column "department" is included automatically.</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">"department"</span><span class="o">).</span><span class="na">agg</span><span class="o">(</span><span class="n">max</span><span class="o">(</span><span class="s">"age"</span><span class="o">),</span> <span class="n">sum</span><span class="o">(</span><span class="s">"expense"</span><span class="o">));</span> |
| |
| <span class="c1">// Revert to 1.3 behavior (not retaining grouping column) by:</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="na">setConf</span><span class="o">(</span><span class="s">"spark.sql.retainGroupColumns"</span><span class="o">,</span> <span class="s">"false"</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">import</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">as</span> <span class="nn">func</span> |
| |
| <span class="c"># In 1.3.x, in order for the grouping column "department" to show up,</span> |
| <span class="c"># it must be included explicitly as part of the agg function call.</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="p">(</span><span class="s">"department"</span><span class="p">)</span><span class="o">.</span><span class="n">agg</span><span class="p">(</span><span class="s">"department"</span><span class="p">),</span> <span class="n">func</span><span class="o">.</span><span class="n">max</span><span class="p">(</span><span class="s">"age"</span><span class="p">),</span> <span class="n">func</span><span class="o">.</span><span class="n">sum</span><span class="p">(</span><span class="s">"expense"</span><span class="p">))</span> |
| |
| <span class="c"># In 1.4+, grouping column "department" is included automatically.</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">groupBy</span><span class="p">(</span><span class="s">"department"</span><span class="p">)</span><span class="o">.</span><span class="n">agg</span><span class="p">(</span><span class="n">func</span><span class="o">.</span><span class="n">max</span><span class="p">(</span><span class="s">"age"</span><span class="p">),</span> <span class="n">func</span><span class="o">.</span><span class="n">sum</span><span class="p">(</span><span class="s">"expense"</span><span class="p">))</span> |
| |
| <span class="c"># Revert to 1.3.x behavior (not retaining grouping column) by:</span> |
| <span class="n">sqlContext</span><span class="o">.</span><span class="n">setConf</span><span class="p">(</span><span class="s">"spark.sql.retainGroupColumns"</span><span class="p">,</span> <span class="s">"false"</span><span class="p">)</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="upgrading-from-spark-sql-10-12-to-13">Upgrading from Spark SQL 1.0-1.2 to 1.3</h2> |
| |
| <p>In Spark 1.3 we removed the “Alpha” label from Spark SQL and as part of this did a cleanup of the |
| available APIs. From Spark 1.3 onwards, Spark SQL will provide binary compatibility with other |
| releases in the 1.X series. This compatibility guarantee excludes APIs that are explicitly marked |
| as unstable (i.e., DeveloperAPI or Experimental).</p> |
| |
| <h4 id="rename-of-schemardd-to-dataframe">Rename of SchemaRDD to DataFrame</h4> |
| |
| <p>The largest change that users will notice when upgrading to Spark SQL 1.3 is that <code>SchemaRDD</code> has |
| been renamed to <code>DataFrame</code>. This is primarily because DataFrames no longer inherit from RDD |
| directly, but instead provide most of the functionality that RDDs provide though their own |
| implementation. DataFrames can still be converted to RDDs by calling the <code>.rdd</code> method.</p> |
| |
| <p>In Scala there is a type alias from <code>SchemaRDD</code> to <code>DataFrame</code> to provide source compatibility for |
| some use cases. It is still recommended that users update their code to use <code>DataFrame</code> instead. |
| Java and Python users will need to update their code.</p> |
| |
| <h4 id="unification-of-the-java-and-scala-apis">Unification of the Java and Scala APIs</h4> |
| |
| <p>Prior to Spark 1.3 there were separate Java compatible classes (<code>JavaSQLContext</code> and <code>JavaSchemaRDD</code>) |
| that mirrored the Scala API. In Spark 1.3 the Java API and Scala API have been unified. Users |
| of either language should use <code>SQLContext</code> and <code>DataFrame</code>. In general theses classes try to |
| use types that are usable from both languages (i.e. <code>Array</code> instead of language specific collections). |
| In some cases where no common type exists (e.g., for passing in closures or Maps) function overloading |
| is used instead.</p> |
| |
| <p>Additionally the Java specific types API has been removed. Users of both Scala and Java should |
| use the classes present in <code>org.apache.spark.sql.types</code> to describe schema programmatically.</p> |
| |
| <h4 id="isolation-of-implicit-conversions-and-removal-of-dsl-package-scala-only">Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)</h4> |
| |
| <p>Many of the code examples prior to Spark 1.3 started with <code>import sqlContext._</code>, which brought |
| all of the functions from sqlContext into scope. In Spark 1.3 we have isolated the implicit |
| conversions for converting <code>RDD</code>s into <code>DataFrame</code>s into an object inside of the <code>SQLContext</code>. |
| Users should now write <code>import sqlContext.implicits._</code>.</p> |
| |
| <p>Additionally, the implicit conversions now only augment RDDs that are composed of <code>Product</code>s (i.e., |
| case classes or tuples) with a method <code>toDF</code>, instead of applying automatically.</p> |
| |
| <p>When using function inside of the DSL (now replaced with the <code>DataFrame</code> API) users used to import |
| <code>org.apache.spark.sql.catalyst.dsl</code>. Instead the public dataframe functions API should be used: |
| <code>import org.apache.spark.sql.functions._</code>.</p> |
| |
| <h4 id="removal-of-the-type-aliases-in-orgapachesparksql-for-datatype-scala-only">Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)</h4> |
| |
| <p>Spark 1.3 removes the type aliases that were present in the base sql package for <code>DataType</code>. Users |
| should instead import the classes in <code>org.apache.spark.sql.types</code></p> |
| |
| <h4 id="udf-registration-moved-to-sqlcontextudf-java--scala">UDF Registration Moved to <code>sqlContext.udf</code> (Java & Scala)</h4> |
| |
| <p>Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been |
| moved into the udf object in <code>SQLContext</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">sqlContext</span><span class="o">.</span><span class="n">udf</span><span class="o">.</span><span class="n">register</span><span class="o">(</span><span class="s">"strLen"</span><span class="o">,</span> <span class="o">(</span><span class="n">s</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span> <span class="k">=></span> <span class="n">s</span><span class="o">.</span><span class="n">length</span><span class="o">())</span></code></pre></div> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">sqlContext</span><span class="o">.</span><span class="na">udf</span><span class="o">().</span><span class="na">register</span><span class="o">(</span><span class="s">"strLen"</span><span class="o">,</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">-></span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="na">IntegerType</span><span class="o">);</span></code></pre></div> |
| |
| </div> |
| |
| </div> |
| |
| <p>Python UDF registration is unchanged.</p> |
| |
| <h4 id="python-datatypes-no-longer-singletons">Python DataTypes No Longer Singletons</h4> |
| |
| <p>When using DataTypes in Python you will need to construct them (i.e. <code>StringType()</code>) instead of |
| referencing a singleton.</p> |
| |
| <h2 id="migration-guide-for-shark-users">Migration Guide for Shark Users</h2> |
| |
| <h3 id="scheduling">Scheduling</h3> |
| <p>To set a <a href="job-scheduling.html#fair-scheduler-pools">Fair Scheduler</a> pool for a JDBC client session, |
| users can set the <code>spark.sql.thriftserver.scheduler.pool</code> variable:</p> |
| |
| <pre><code>SET spark.sql.thriftserver.scheduler.pool=accounting; |
| </code></pre> |
| |
| <h3 id="reducer-number">Reducer number</h3> |
| |
| <p>In Shark, default reducer number is 1 and is controlled by the property <code>mapred.reduce.tasks</code>. Spark |
| SQL deprecates this property in favor of <code>spark.sql.shuffle.partitions</code>, whose default value |
| is 200. Users may customize this property via <code>SET</code>:</p> |
| |
| <pre><code>SET spark.sql.shuffle.partitions=10; |
| SELECT page, count(*) c |
| FROM logs_last_month_cached |
| GROUP BY page ORDER BY c DESC LIMIT 10; |
| </code></pre> |
| |
| <p>You may also put this property in <code>hive-site.xml</code> to override the default value.</p> |
| |
| <p>For now, the <code>mapred.reduce.tasks</code> property is still recognized, and is converted to |
| <code>spark.sql.shuffle.partitions</code> automatically.</p> |
| |
| <h3 id="caching">Caching</h3> |
| |
| <p>The <code>shark.cache</code> table property no longer exists, and tables whose name end with <code>_cached</code> are no |
| longer automatically cached. Instead, we provide <code>CACHE TABLE</code> and <code>UNCACHE TABLE</code> statements to |
| let user control table caching explicitly:</p> |
| |
| <pre><code>CACHE TABLE logs_last_month; |
| UNCACHE TABLE logs_last_month; |
| </code></pre> |
| |
| <p><strong>NOTE:</strong> <code>CACHE TABLE tbl</code> is now <strong>eager</strong> by default not <strong>lazy</strong>. Don’t need to trigger cache materialization manually anymore.</p> |
| |
| <p>Spark SQL newly introduced a statement to let user control table caching whether or not lazy since Spark 1.2.0:</p> |
| |
| <pre><code>CACHE [LAZY] TABLE [AS SELECT] ... |
| </code></pre> |
| |
| <p>Several caching related features are not supported yet:</p> |
| |
| <ul> |
| <li>User defined partition level cache eviction policy</li> |
| <li>RDD reloading</li> |
| <li>In-memory cache write through policy</li> |
| </ul> |
| |
| <h2 id="compatibility-with-apache-hive">Compatibility with Apache Hive</h2> |
| |
| <p>Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. |
| Currently Hive SerDes and UDFs are based on Hive 1.2.1, |
| and Spark SQL can be connected to different versions of Hive Metastore |
| (from 0.12.0 to 1.2.1. Also see http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore).</p> |
| |
| <h4 id="deploying-in-existing-hive-warehouses">Deploying in Existing Hive Warehouses</h4> |
| |
| <p>The Spark SQL Thrift JDBC server is designed to be “out of the box” compatible with existing Hive |
| installations. You do not need to modify your existing Hive Metastore or change the data placement |
| or partitioning of your tables.</p> |
| |
| <h3 id="supported-hive-features">Supported Hive Features</h3> |
| |
| <p>Spark SQL supports the vast majority of Hive features, such as:</p> |
| |
| <ul> |
| <li>Hive query statements, including: |
| <ul> |
| <li><code>SELECT</code></li> |
| <li><code>GROUP BY</code></li> |
| <li><code>ORDER BY</code></li> |
| <li><code>CLUSTER BY</code></li> |
| <li><code>SORT BY</code></li> |
| </ul> |
| </li> |
| <li>All Hive operators, including: |
| <ul> |
| <li>Relational operators (<code>=</code>, <code>⇔</code>, <code>==</code>, <code><></code>, <code><</code>, <code>></code>, <code>>=</code>, <code><=</code>, etc)</li> |
| <li>Arithmetic operators (<code>+</code>, <code>-</code>, <code>*</code>, <code>/</code>, <code>%</code>, etc)</li> |
| <li>Logical operators (<code>AND</code>, <code>&&</code>, <code>OR</code>, <code>||</code>, etc)</li> |
| <li>Complex type constructors</li> |
| <li>Mathematical functions (<code>sign</code>, <code>ln</code>, <code>cos</code>, etc)</li> |
| <li>String functions (<code>instr</code>, <code>length</code>, <code>printf</code>, etc)</li> |
| </ul> |
| </li> |
| <li>User defined functions (UDF)</li> |
| <li>User defined aggregation functions (UDAF)</li> |
| <li>User defined serialization formats (SerDes)</li> |
| <li>Window functions</li> |
| <li>Joins |
| <ul> |
| <li><code>JOIN</code></li> |
| <li><code>{LEFT|RIGHT|FULL} OUTER JOIN</code></li> |
| <li><code>LEFT SEMI JOIN</code></li> |
| <li><code>CROSS JOIN</code></li> |
| </ul> |
| </li> |
| <li>Unions</li> |
| <li>Sub-queries |
| <ul> |
| <li><code>SELECT col FROM ( SELECT a + b AS col from t1) t2</code></li> |
| </ul> |
| </li> |
| <li>Sampling</li> |
| <li>Explain</li> |
| <li>Partitioned tables including dynamic partition insertion</li> |
| <li>View</li> |
| <li>All Hive DDL Functions, including: |
| <ul> |
| <li><code>CREATE TABLE</code></li> |
| <li><code>CREATE TABLE AS SELECT</code></li> |
| <li><code>ALTER TABLE</code></li> |
| </ul> |
| </li> |
| <li>Most Hive Data types, including: |
| <ul> |
| <li><code>TINYINT</code></li> |
| <li><code>SMALLINT</code></li> |
| <li><code>INT</code></li> |
| <li><code>BIGINT</code></li> |
| <li><code>BOOLEAN</code></li> |
| <li><code>FLOAT</code></li> |
| <li><code>DOUBLE</code></li> |
| <li><code>STRING</code></li> |
| <li><code>BINARY</code></li> |
| <li><code>TIMESTAMP</code></li> |
| <li><code>DATE</code></li> |
| <li><code>ARRAY<></code></li> |
| <li><code>MAP<></code></li> |
| <li><code>STRUCT<></code></li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h3 id="unsupported-hive-functionality">Unsupported Hive Functionality</h3> |
| |
| <p>Below is a list of Hive features that we don’t support yet. Most of these features are rarely used |
| in Hive deployments.</p> |
| |
| <p><strong>Major Hive Features</strong></p> |
| |
| <ul> |
| <li>Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL |
| doesn’t support buckets yet.</li> |
| </ul> |
| |
| <p><strong>Esoteric Hive Features</strong></p> |
| |
| <ul> |
| <li><code>UNION</code> type</li> |
| <li>Unique join</li> |
| <li>Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at |
| the moment and only supports populating the sizeInBytes field of the hive metastore.</li> |
| </ul> |
| |
| <p><strong>Hive Input/Output Formats</strong></p> |
| |
| <ul> |
| <li>File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.</li> |
| <li>Hadoop archive</li> |
| </ul> |
| |
| <p><strong>Hive Optimizations</strong></p> |
| |
| <p>A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are |
| less important due to Spark SQL’s in-memory computational model. Others are slotted for future |
| releases of Spark SQL.</p> |
| |
| <ul> |
| <li>Block level bitmap indexes and virtual columns (used to build indexes)</li> |
| <li>Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you |
| need to control the degree of parallelism post-shuffle using “<code>SET spark.sql.shuffle.partitions=[num_tasks];</code>”.</li> |
| <li>Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still |
| launches tasks to compute the result.</li> |
| <li>Skew data flag: Spark SQL does not follow the skew data flags in Hive.</li> |
| <li><code>STREAMTABLE</code> hint in join: Spark SQL does not follow the <code>STREAMTABLE</code> hint.</li> |
| <li>Merge multiple small files for query results: if the result output contains multiple small files, |
| Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS |
| metadata. Spark SQL does not support that.</li> |
| </ul> |
| |
| <h1 id="reference">Reference</h1> |
| |
| <h2 id="data-types">Data Types</h2> |
| |
| <p>Spark SQL and DataFrames support the following data types:</p> |
| |
| <ul> |
| <li>Numeric types |
| <ul> |
| <li><code>ByteType</code>: Represents 1-byte signed integer numbers. |
| The range of numbers is from <code>-128</code> to <code>127</code>.</li> |
| <li><code>ShortType</code>: Represents 2-byte signed integer numbers. |
| The range of numbers is from <code>-32768</code> to <code>32767</code>.</li> |
| <li><code>IntegerType</code>: Represents 4-byte signed integer numbers. |
| The range of numbers is from <code>-2147483648</code> to <code>2147483647</code>.</li> |
| <li><code>LongType</code>: Represents 8-byte signed integer numbers. |
| The range of numbers is from <code>-9223372036854775808</code> to <code>9223372036854775807</code>.</li> |
| <li><code>FloatType</code>: Represents 4-byte single-precision floating point numbers.</li> |
| <li><code>DoubleType</code>: Represents 8-byte double-precision floating point numbers.</li> |
| <li><code>DecimalType</code>: Represents arbitrary-precision signed decimal numbers. Backed internally by <code>java.math.BigDecimal</code>. A <code>BigDecimal</code> consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.</li> |
| </ul> |
| </li> |
| <li>String type |
| <ul> |
| <li><code>StringType</code>: Represents character string values.</li> |
| </ul> |
| </li> |
| <li>Binary type |
| <ul> |
| <li><code>BinaryType</code>: Represents byte sequence values.</li> |
| </ul> |
| </li> |
| <li>Boolean type |
| <ul> |
| <li><code>BooleanType</code>: Represents boolean values.</li> |
| </ul> |
| </li> |
| <li>Datetime type |
| <ul> |
| <li><code>TimestampType</code>: Represents values comprising values of fields year, month, day, |
| hour, minute, and second.</li> |
| <li><code>DateType</code>: Represents values comprising values of fields year, month, day.</li> |
| </ul> |
| </li> |
| <li>Complex types |
| <ul> |
| <li><code>ArrayType(elementType, containsNull)</code>: Represents values comprising a sequence of |
| elements with the type of <code>elementType</code>. <code>containsNull</code> is used to indicate if |
| elements in a <code>ArrayType</code> value can have <code>null</code> values.</li> |
| <li><code>MapType(keyType, valueType, valueContainsNull)</code>: |
| Represents values comprising a set of key-value pairs. The data type of keys are |
| described by <code>keyType</code> and the data type of values are described by <code>valueType</code>. |
| For a <code>MapType</code> value, keys are not allowed to have <code>null</code> values. <code>valueContainsNull</code> |
| is used to indicate if values of a <code>MapType</code> value can have <code>null</code> values.</li> |
| <li><code>StructType(fields)</code>: Represents values with the structure described by |
| a sequence of <code>StructField</code>s (<code>fields</code>). |
| <ul> |
| <li><code>StructField(name, dataType, nullable)</code>: Represents a field in a <code>StructType</code>. |
| The name of a field is indicated by <code>name</code>. The data type of a field is indicated |
| by <code>dataType</code>. <code>nullable</code> is used to indicate if values of this fields can have |
| <code>null</code> values.</li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| </ul> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>All data types of Spark SQL are located in the package <code>org.apache.spark.sql.types</code>. |
| You can access them by doing</p> |
| |
| <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span></code></pre></div> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in Scala</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> Byte </td> |
| <td> |
| ByteType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> Short </td> |
| <td> |
| ShortType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> Int </td> |
| <td> |
| IntegerType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> Long </td> |
| <td> |
| LongType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> Float </td> |
| <td> |
| FloatType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> Double </td> |
| <td> |
| DoubleType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> java.math.BigDecimal </td> |
| <td> |
| DecimalType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> String </td> |
| <td> |
| StringType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> Array[Byte] </td> |
| <td> |
| BinaryType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> Boolean </td> |
| <td> |
| BooleanType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> java.sql.Timestamp </td> |
| <td> |
| TimestampType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> java.sql.Date </td> |
| <td> |
| DateType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> scala.collection.Seq </td> |
| <td> |
| ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>containsNull</i> is <i>true</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> scala.collection.Map </td> |
| <td> |
| MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>true</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> org.apache.spark.sql.Row </td> |
| <td> |
| StructType(<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same |
| name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in Scala of the data type of this field |
| (For example, Int for a StructField with the data type IntegerType) </td> |
| <td> |
| StructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| <div data-lang="java"> |
| |
| <p>All data types of Spark SQL are located in the package of |
| <code>org.apache.spark.sql.types</code>. To access or create a data type, |
| please use factory methods provided in |
| <code>org.apache.spark.sql.types.DataTypes</code>.</p> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in Java</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> byte or Byte </td> |
| <td> |
| DataTypes.ByteType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> short or Short </td> |
| <td> |
| DataTypes.ShortType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> int or Integer </td> |
| <td> |
| DataTypes.IntegerType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> long or Long </td> |
| <td> |
| DataTypes.LongType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> float or Float </td> |
| <td> |
| DataTypes.FloatType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> double or Double </td> |
| <td> |
| DataTypes.DoubleType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> java.math.BigDecimal </td> |
| <td> |
| DataTypes.createDecimalType()<br /> |
| DataTypes.createDecimalType(<i>precision</i>, <i>scale</i>). |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> String </td> |
| <td> |
| DataTypes.StringType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> byte[] </td> |
| <td> |
| DataTypes.BinaryType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> boolean or Boolean </td> |
| <td> |
| DataTypes.BooleanType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> java.sql.Timestamp </td> |
| <td> |
| DataTypes.TimestampType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> java.sql.Date </td> |
| <td> |
| DataTypes.DateType |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> java.util.List </td> |
| <td> |
| DataTypes.createArrayType(<i>elementType</i>)<br /> |
| <b>Note:</b> The value of <i>containsNull</i> will be <i>true</i><br /> |
| DataTypes.createArrayType(<i>elementType</i>, <i>containsNull</i>). |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> java.util.Map </td> |
| <td> |
| DataTypes.createMapType(<i>keyType</i>, <i>valueType</i>)<br /> |
| <b>Note:</b> The value of <i>valueContainsNull</i> will be <i>true</i>.<br /> |
| DataTypes.createMapType(<i>keyType</i>, <i>valueType</i>, <i>valueContainsNull</i>)<br /> |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> org.apache.spark.sql.Row </td> |
| <td> |
| DataTypes.createStructType(<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a List or an array of StructFields. |
| Also, two fields with the same name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in Java of the data type of this field |
| (For example, int for a StructField with the data type IntegerType) </td> |
| <td> |
| DataTypes.createStructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| <div data-lang="python"> |
| |
| <p>All data types of Spark SQL are located in the package of <code>pyspark.sql.types</code>. |
| You can access them by doing</p> |
| |
| <div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="o">*</span></code></pre></div> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in Python</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> |
| int or long <br /> |
| <b>Note:</b> Numbers will be converted to 1-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of -128 to 127. |
| </td> |
| <td> |
| ByteType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> |
| int or long <br /> |
| <b>Note:</b> Numbers will be converted to 2-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of -32768 to 32767. |
| </td> |
| <td> |
| ShortType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> int or long </td> |
| <td> |
| IntegerType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> |
| long <br /> |
| <b>Note:</b> Numbers will be converted to 8-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of |
| -9223372036854775808 to 9223372036854775807. |
| Otherwise, please convert data to decimal.Decimal and use DecimalType. |
| </td> |
| <td> |
| LongType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> |
| float <br /> |
| <b>Note:</b> Numbers will be converted to 4-byte single-precision floating |
| point numbers at runtime. |
| </td> |
| <td> |
| FloatType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> float </td> |
| <td> |
| DoubleType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> decimal.Decimal </td> |
| <td> |
| DecimalType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> string </td> |
| <td> |
| StringType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> bytearray </td> |
| <td> |
| BinaryType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> bool </td> |
| <td> |
| BooleanType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> datetime.datetime </td> |
| <td> |
| TimestampType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> datetime.date </td> |
| <td> |
| DateType() |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> list, tuple, or array </td> |
| <td> |
| ArrayType(<i>elementType</i>, [<i>containsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>containsNull</i> is <i>True</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> dict </td> |
| <td> |
| MapType(<i>keyType</i>, <i>valueType</i>, [<i>valueContainsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>True</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> list or tuple </td> |
| <td> |
| StructType(<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same |
| name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in Python of the data type of this field |
| (For example, Int for a StructField with the data type IntegerType) </td> |
| <td> |
| StructField(<i>name</i>, <i>dataType</i>, <i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| <div data-lang="r"> |
| |
| <table class="table"> |
| <tr> |
| <th style="width:20%">Data type</th> |
| <th style="width:40%">Value type in R</th> |
| <th>API to access or create a data type</th></tr> |
| <tr> |
| <td> <b>ByteType</b> </td> |
| <td> |
| integer <br /> |
| <b>Note:</b> Numbers will be converted to 1-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of -128 to 127. |
| </td> |
| <td> |
| "byte" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ShortType</b> </td> |
| <td> |
| integer <br /> |
| <b>Note:</b> Numbers will be converted to 2-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of -32768 to 32767. |
| </td> |
| <td> |
| "short" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>IntegerType</b> </td> |
| <td> integer </td> |
| <td> |
| "integer" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>LongType</b> </td> |
| <td> |
| integer <br /> |
| <b>Note:</b> Numbers will be converted to 8-byte signed integer numbers at runtime. |
| Please make sure that numbers are within the range of |
| -9223372036854775808 to 9223372036854775807. |
| Otherwise, please convert data to decimal.Decimal and use DecimalType. |
| </td> |
| <td> |
| "long" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>FloatType</b> </td> |
| <td> |
| numeric <br /> |
| <b>Note:</b> Numbers will be converted to 4-byte single-precision floating |
| point numbers at runtime. |
| </td> |
| <td> |
| "float" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DoubleType</b> </td> |
| <td> numeric </td> |
| <td> |
| "double" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DecimalType</b> </td> |
| <td> Not supported </td> |
| <td> |
| Not supported |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StringType</b> </td> |
| <td> character </td> |
| <td> |
| "string" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BinaryType</b> </td> |
| <td> raw </td> |
| <td> |
| "binary" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>BooleanType</b> </td> |
| <td> logical </td> |
| <td> |
| "bool" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>TimestampType</b> </td> |
| <td> POSIXct </td> |
| <td> |
| "timestamp" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>DateType</b> </td> |
| <td> Date </td> |
| <td> |
| "date" |
| </td> |
| </tr> |
| <tr> |
| <td> <b>ArrayType</b> </td> |
| <td> vector or list </td> |
| <td> |
| list(type="array", elementType=<i>elementType</i>, containsNull=[<i>containsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>containsNull</i> is <i>True</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>MapType</b> </td> |
| <td> environment </td> |
| <td> |
| list(type="map", keyType=<i>keyType</i>, valueType=<i>valueType</i>, valueContainsNull=[<i>valueContainsNull</i>])<br /> |
| <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>True</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructType</b> </td> |
| <td> named list</td> |
| <td> |
| list(type="struct", fields=<i>fields</i>)<br /> |
| <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same |
| name are not allowed. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>StructField</b> </td> |
| <td> The value type in R of the data type of this field |
| (For example, integer for a StructField with the data type IntegerType) </td> |
| <td> |
| list(name=<i>name</i>, type=<i>dataType</i>, nullable=<i>nullable</i>) |
| </td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| </div> |
| |
| <h2 id="nan-semantics">NaN Semantics</h2> |
| |
| <p>There is specially handling for not-a-number (NaN) when dealing with <code>float</code> or <code>double</code> types that |
| does not exactly match standard floating point semantics. |
| Specifically:</p> |
| |
| <ul> |
| <li>NaN = NaN returns true.</li> |
| <li>In aggregations all NaN values are grouped together.</li> |
| <li>NaN is treated as a normal value in join keys.</li> |
| <li>NaN values go last when in ascending order, larger than any other numeric value.</li> |
| </ul> |
| |
| |
| </div> <!-- /container --> |
| |
| <script src="js/vendor/jquery-1.8.0.min.js"></script> |
| <script src="js/vendor/bootstrap.min.js"></script> |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |