blob: a2f2117a350a61ce06f56f2add3fd62ddabba4f7 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Parquet Files - Spark 2.4.1 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<!-- Google analytics script -->
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-32518208-2']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<div class="navbar navbar-fixed-top" id="topbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">2.4.1</span>
</div>
<ul class="nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a></li>
<li><a href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a></li>
<li><a href="structured-streaming-programming-guide.html">Structured Streaming</a></li>
<li><a href="streaming-programming-guide.html">Spark Streaming (DStreams)</a></li>
<li><a href="ml-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="sparkr.html">SparkR (R on Spark)</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
<li><a href="api/java/index.html">Java</a></li>
<li><a href="api/python/index.html">Python</a></li>
<li><a href="api/R/index.html">R</a></li>
<li><a href="api/sql/index.html">SQL, Built-in Functions</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
<li><a href="running-on-kubernetes.html">Kubernetes</a></li>
</ul>
</li>
<li class="dropdown">
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li class="divider"></li>
<li><a href="building-spark.html">Building Spark</a></li>
<li><a href="https://spark.apache.org/contributing.html">Contributing to Spark</a></li>
<li><a href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a></li>
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v2.4.1</span></p>-->
</div>
</div>
</div>
<div class="container-wrapper">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3>
<ul>
<li>
<a href="sql-getting-started.html">
Getting Started
</a>
</li>
<li>
<a href="sql-data-sources.html">
Data Sources
</a>
</li>
<ul>
<li>
<a href="sql-data-sources-load-save-functions.html">
Generic Load/Save Functions
</a>
</li>
<li>
<a href="sql-data-sources-parquet.html">
<b>Parquet Files</b>
</a>
</li>
<li>
<a href="sql-data-sources-orc.html">
ORC Files
</a>
</li>
<li>
<a href="sql-data-sources-json.html">
JSON Files
</a>
</li>
<li>
<a href="sql-data-sources-hive-tables.html">
Hive Tables
</a>
</li>
<li>
<a href="sql-data-sources-jdbc.html">
JDBC To Other Databases
</a>
</li>
<li>
<a href="sql-data-sources-avro.html">
Avro Files
</a>
</li>
<li>
<a href="sql-data-sources-troubleshooting.html">
Troubleshooting
</a>
</li>
</ul>
<li>
<a href="sql-performance-tuning.html">
Performance Tuning
</a>
</li>
<li>
<a href="sql-distributed-sql-engine.html">
Distributed SQL Engine
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html">
PySpark Usage Guide for Pandas with Apache Arrow
</a>
</li>
<li>
<a href="sql-migration-guide.html">
Migration Guide
</a>
</li>
<li>
<a href="sql-reference.html">
Reference
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar" id="content">
<h1 class="title">Parquet Files</h1>
<ul id="markdown-toc">
<li><a href="#loading-data-programmatically" id="markdown-toc-loading-data-programmatically">Loading Data Programmatically</a></li>
<li><a href="#partition-discovery" id="markdown-toc-partition-discovery">Partition Discovery</a></li>
<li><a href="#schema-merging" id="markdown-toc-schema-merging">Schema Merging</a></li>
<li><a href="#hive-metastore-parquet-table-conversion" id="markdown-toc-hive-metastore-parquet-table-conversion">Hive metastore Parquet table conversion</a> <ul>
<li><a href="#hiveparquet-schema-reconciliation" id="markdown-toc-hiveparquet-schema-reconciliation">Hive/Parquet Schema Reconciliation</a></li>
<li><a href="#metadata-refreshing" id="markdown-toc-metadata-refreshing">Metadata Refreshing</a></li>
</ul>
</li>
<li><a href="#configuration" id="markdown-toc-configuration">Configuration</a></li>
</ul>
<p><a href="http://parquet.io">Parquet</a> is a columnar format that is supported by many other data processing systems.
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
of the original data. When writing Parquet files, all columns are automatically converted to be nullable for
compatibility reasons.</p>
<h3 id="loading-data-programmatically">Loading Data Programmatically</h3>
<p>Using the data from the above example:</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="c1">// Encoders for most common types are automatically provided by importing spark.implicits._</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
<span class="k">val</span> <span class="n">peopleDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="o">)</span>
<span class="c1">// DataFrames can be saved as Parquet files, maintaining the schema information</span>
<span class="n">peopleDF</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">&quot;people.parquet&quot;</span><span class="o">)</span>
<span class="c1">// Read in the parquet file created above</span>
<span class="c1">// Parquet files are self-describing so the schema is preserved</span>
<span class="c1">// The result of loading a Parquet file is also a DataFrame</span>
<span class="k">val</span> <span class="n">parquetFileDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">&quot;people.parquet&quot;</span><span class="o">)</span>
<span class="c1">// Parquet files can also be used to create a temporary view and then used in SQL statements</span>
<span class="n">parquetFileDF</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;parquetFile&quot;</span><span class="o">)</span>
<span class="k">val</span> <span class="n">namesDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19&quot;</span><span class="o">)</span>
<span class="n">namesDF</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">attributes</span> <span class="k">=&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">attributes</span><span class="o">(</span><span class="mi">0</span><span class="o">)).</span><span class="n">show</span><span class="o">()</span>
<span class="c1">// +------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +------------+</span>
<span class="c1">// |Name: Justin|</span>
<span class="c1">// +------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.MapFunction</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Encoders</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">peopleDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">json</span><span class="o">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="o">);</span>
<span class="c1">// DataFrames can be saved as Parquet files, maintaining the schema information</span>
<span class="n">peopleDF</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">parquet</span><span class="o">(</span><span class="s">&quot;people.parquet&quot;</span><span class="o">);</span>
<span class="c1">// Read in the Parquet file created above.</span>
<span class="c1">// Parquet files are self-describing so the schema is preserved</span>
<span class="c1">// The result of loading a parquet file is also a DataFrame</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">parquetFileDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">parquet</span><span class="o">(</span><span class="s">&quot;people.parquet&quot;</span><span class="o">);</span>
<span class="c1">// Parquet files can also be used to create a temporary view and then used in SQL statements</span>
<span class="n">parquetFileDF</span><span class="o">.</span><span class="na">createOrReplaceTempView</span><span class="o">(</span><span class="s">&quot;parquetFile&quot;</span><span class="o">);</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">namesDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">(</span><span class="s">&quot;SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19&quot;</span><span class="o">);</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">namesDS</span> <span class="o">=</span> <span class="n">namesDF</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
<span class="o">(</span><span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;)</span> <span class="n">row</span> <span class="o">-&gt;</span> <span class="s">&quot;Name: &quot;</span> <span class="o">+</span> <span class="n">row</span><span class="o">.</span><span class="na">getString</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span>
<span class="n">Encoders</span><span class="o">.</span><span class="na">STRING</span><span class="o">());</span>
<span class="n">namesDS</span><span class="o">.</span><span class="na">show</span><span class="o">();</span>
<span class="c1">// +------------+</span>
<span class="c1">// | value|</span>
<span class="c1">// +------------+</span>
<span class="c1">// |Name: Justin|</span>
<span class="c1">// +------------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<div class="highlight"><pre><span></span><span class="n">peopleDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">json</span><span class="p">(</span><span class="s2">&quot;examples/src/main/resources/people.json&quot;</span><span class="p">)</span>
<span class="c1"># DataFrames can be saved as Parquet files, maintaining the schema information.</span>
<span class="n">peopleDF</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s2">&quot;people.parquet&quot;</span><span class="p">)</span>
<span class="c1"># Read in the Parquet file created above.</span>
<span class="c1"># Parquet files are self-describing so the schema is preserved.</span>
<span class="c1"># The result of loading a parquet file is also a DataFrame.</span>
<span class="n">parquetFile</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s2">&quot;people.parquet&quot;</span><span class="p">)</span>
<span class="c1"># Parquet files can also be used to create a temporary view and then used in SQL statements.</span>
<span class="n">parquetFile</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">&quot;parquetFile&quot;</span><span class="p">)</span>
<span class="n">teenagers</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span><span class="s2">&quot;SELECT name FROM parquetFile WHERE age &gt;= 13 AND age &lt;= 19&quot;</span><span class="p">)</span>
<span class="n">teenagers</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
<span class="c1"># +------+</span>
<span class="c1"># | name|</span>
<span class="c1"># +------+</span>
<span class="c1"># |Justin|</span>
<span class="c1"># +------+</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small></div>
</div>
<div data-lang="r">
<div class="highlight"><pre><span></span>df <span class="o">&lt;-</span> read.df<span class="p">(</span><span class="s">&quot;examples/src/main/resources/people.json&quot;</span><span class="p">,</span> <span class="s">&quot;json&quot;</span><span class="p">)</span>
<span class="c1"># SparkDataFrame can be saved as Parquet files, maintaining the schema information.</span>
write.parquet<span class="p">(</span>df<span class="p">,</span> <span class="s">&quot;people.parquet&quot;</span><span class="p">)</span>
<span class="c1"># Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.</span>
<span class="c1"># The result of loading a parquet file is also a DataFrame.</span>
parquetFile <span class="o">&lt;-</span> read.parquet<span class="p">(</span><span class="s">&quot;people.parquet&quot;</span><span class="p">)</span>
<span class="c1"># Parquet files can also be used to create a temporary view and then used in SQL statements.</span>
createOrReplaceTempView<span class="p">(</span>parquetFile<span class="p">,</span> <span class="s">&quot;parquetFile&quot;</span><span class="p">)</span>
teenagers <span class="o">&lt;-</span> sql<span class="p">(</span><span class="s">&quot;SELECT name FROM parquetFile WHERE age &gt;= 13 AND age &lt;= 19&quot;</span><span class="p">)</span>
<span class="kp">head</span><span class="p">(</span>teenagers<span class="p">)</span>
<span class="c1">## name</span>
<span class="c1">## 1 Justin</span>
<span class="c1"># We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with &quot;Name:&quot;</span>
schema <span class="o">&lt;-</span> structType<span class="p">(</span>structField<span class="p">(</span><span class="s">&quot;name&quot;</span><span class="p">,</span> <span class="s">&quot;string&quot;</span><span class="p">))</span>
teenNames <span class="o">&lt;-</span> dapply<span class="p">(</span>df<span class="p">,</span> <span class="kr">function</span><span class="p">(</span>p<span class="p">)</span> <span class="p">{</span> <span class="kp">cbind</span><span class="p">(</span><span class="kp">paste</span><span class="p">(</span><span class="s">&quot;Name:&quot;</span><span class="p">,</span> p<span class="o">$</span>name<span class="p">))</span> <span class="p">},</span> schema<span class="p">)</span>
<span class="kr">for</span> <span class="p">(</span>teenName <span class="kr">in</span> collect<span class="p">(</span>teenNames<span class="p">)</span><span class="o">$</span>name<span class="p">)</span> <span class="p">{</span>
<span class="kp">cat</span><span class="p">(</span>teenName<span class="p">,</span> <span class="s">&quot;\n&quot;</span><span class="p">)</span>
<span class="p">}</span>
<span class="c1">## Name: Michael</span>
<span class="c1">## Name: Andy</span>
<span class="c1">## Name: Justin</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div>
</div>
<div data-lang="sql">
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span></span><span class="k">CREATE</span> <span class="k">TEMPORARY</span> <span class="k">VIEW</span> <span class="n">parquetTable</span>
<span class="k">USING</span> <span class="n">org</span><span class="p">.</span><span class="n">apache</span><span class="p">.</span><span class="n">spark</span><span class="p">.</span><span class="k">sql</span><span class="p">.</span><span class="n">parquet</span>
<span class="k">OPTIONS</span> <span class="p">(</span>
<span class="n">path</span> <span class="ss">&quot;examples/src/main/resources/people.parquet&quot;</span>
<span class="p">)</span>
<span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">parquetTable</span></code></pre></figure>
</div>
</div>
<h3 id="partition-discovery">Partition Discovery</h3>
<p>Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
table, data are usually stored in different directories, with partitioning column values encoded in
the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet)
are able to discover and infer partitioning information automatically.
For example, we can store all our previously used
population data into a partitioned table using the following directory structure, with two extra
columns, <code>gender</code> and <code>country</code> as partitioning columns:</p>
<figure class="highlight"><pre><code class="language-text" data-lang="text"><span></span>path
└── to
└── table
├── gender=male
│   ├── ...
│   │
│   ├── country=US
│   │   └── data.parquet
│   ├── country=CN
│   │   └── data.parquet
│   └── ...
└── gender=female
   ├── ...
   │
   ├── country=US
   │   └── data.parquet
   ├── country=CN
   │   └── data.parquet
   └── ...</code></pre></figure>
<p>By passing <code>path/to/table</code> to either <code>SparkSession.read.parquet</code> or <code>SparkSession.read.load</code>, Spark SQL
will automatically extract the partitioning information from the paths.
Now the schema of the returned DataFrame becomes:</p>
<figure class="highlight"><pre><code class="language-text" data-lang="text"><span></span>root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)</code></pre></figure>
<p>Notice that the data types of the partitioning columns are automatically inferred. Currently,
numeric data types, date, timestamp and string type are supported. Sometimes users may not want
to automatically infer the data types of the partitioning columns. For these use cases, the
automatic type inference can be configured by
<code>spark.sql.sources.partitionColumnTypeInference.enabled</code>, which is default to <code>true</code>. When type
inference is disabled, string type will be used for the partitioning columns.</p>
<p>Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths
by default. For the above example, if users pass <code>path/to/table/gender=male</code> to either
<code>SparkSession.read.parquet</code> or <code>SparkSession.read.load</code>, <code>gender</code> will not be considered as a
partitioning column. If users need to specify the base path that partition discovery
should start with, they can set <code>basePath</code> in the data source options. For example,
when <code>path/to/table/gender=male</code> is the path of the data and
users set <code>basePath</code> to <code>path/to/table/</code>, <code>gender</code> will be a partitioning column.</p>
<h3 id="schema-merging">Schema Merging</h3>
<p>Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
source is now able to automatically detect this case and merge schemas of all these files.</p>
<p>Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
turned it off by default starting from 1.5.0. You may enable it by</p>
<ol>
<li>setting data source option <code>mergeSchema</code> to <code>true</code> when reading Parquet files (as shown in the
examples below), or</li>
<li>setting the global SQL option <code>spark.sql.parquet.mergeSchema</code> to <code>true</code>.</li>
</ol>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><span></span><span class="c1">// This is used to implicitly convert an RDD to a DataFrame.</span>
<span class="k">import</span> <span class="nn">spark.implicits._</span>
<span class="c1">// Create a simple DataFrame, store into a partition directory</span>
<span class="k">val</span> <span class="n">squaresDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">makeRDD</span><span class="o">(</span><span class="mi">1</span> <span class="n">to</span> <span class="mi">5</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">i</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">i</span> <span class="o">*</span> <span class="n">i</span><span class="o">)).</span><span class="n">toDF</span><span class="o">(</span><span class="s">&quot;value&quot;</span><span class="o">,</span> <span class="s">&quot;square&quot;</span><span class="o">)</span>
<span class="n">squaresDF</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">&quot;data/test_table/key=1&quot;</span><span class="o">)</span>
<span class="c1">// Create another DataFrame in a new partition directory,</span>
<span class="c1">// adding a new column and dropping an existing column</span>
<span class="k">val</span> <span class="n">cubesDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">makeRDD</span><span class="o">(</span><span class="mi">6</span> <span class="n">to</span> <span class="mi">10</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">i</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">i</span> <span class="o">*</span> <span class="n">i</span> <span class="o">*</span> <span class="n">i</span><span class="o">)).</span><span class="n">toDF</span><span class="o">(</span><span class="s">&quot;value&quot;</span><span class="o">,</span> <span class="s">&quot;cube&quot;</span><span class="o">)</span>
<span class="n">cubesDF</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="o">(</span><span class="s">&quot;data/test_table/key=2&quot;</span><span class="o">)</span>
<span class="c1">// Read the partitioned table</span>
<span class="k">val</span> <span class="n">mergedDF</span> <span class="k">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">&quot;mergeSchema&quot;</span><span class="o">,</span> <span class="s">&quot;true&quot;</span><span class="o">).</span><span class="n">parquet</span><span class="o">(</span><span class="s">&quot;data/test_table&quot;</span><span class="o">)</span>
<span class="n">mergedDF</span><span class="o">.</span><span class="n">printSchema</span><span class="o">()</span>
<span class="c1">// The final schema consists of all 3 columns in the Parquet files together</span>
<span class="c1">// with the partitioning column appeared in the partition directory paths</span>
<span class="c1">// root</span>
<span class="c1">// |-- value: int (nullable = true)</span>
<span class="c1">// |-- square: int (nullable = true)</span>
<span class="c1">// |-- cube: int (nullable = true)</span>
<span class="c1">// |-- key: int (nullable = true)</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.</small></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">java.io.Serializable</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.ArrayList</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Arrays</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Dataset</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.sql.Row</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Square</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">value</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">square</span><span class="o">;</span>
<span class="c1">// Getters and setters...</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Cube</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">value</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">cube</span><span class="o">;</span>
<span class="c1">// Getters and setters...</span>
<span class="o">}</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Square</span><span class="o">&gt;</span> <span class="n">squares</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">value</span> <span class="o">=</span> <span class="mi">1</span><span class="o">;</span> <span class="n">value</span> <span class="o">&lt;=</span> <span class="mi">5</span><span class="o">;</span> <span class="n">value</span><span class="o">++)</span> <span class="o">{</span>
<span class="n">Square</span> <span class="n">square</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Square</span><span class="o">();</span>
<span class="n">square</span><span class="o">.</span><span class="na">setValue</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
<span class="n">square</span><span class="o">.</span><span class="na">setSquare</span><span class="o">(</span><span class="n">value</span> <span class="o">*</span> <span class="n">value</span><span class="o">);</span>
<span class="n">squares</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">square</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Create a simple DataFrame, store into a partition directory</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">squaresDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">squares</span><span class="o">,</span> <span class="n">Square</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">squaresDF</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">parquet</span><span class="o">(</span><span class="s">&quot;data/test_table/key=1&quot;</span><span class="o">);</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Cube</span><span class="o">&gt;</span> <span class="n">cubes</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">value</span> <span class="o">=</span> <span class="mi">6</span><span class="o">;</span> <span class="n">value</span> <span class="o">&lt;=</span> <span class="mi">10</span><span class="o">;</span> <span class="n">value</span><span class="o">++)</span> <span class="o">{</span>
<span class="n">Cube</span> <span class="n">cube</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Cube</span><span class="o">();</span>
<span class="n">cube</span><span class="o">.</span><span class="na">setValue</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
<span class="n">cube</span><span class="o">.</span><span class="na">setCube</span><span class="o">(</span><span class="n">value</span> <span class="o">*</span> <span class="n">value</span> <span class="o">*</span> <span class="n">value</span><span class="o">);</span>
<span class="n">cubes</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">cube</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Create another DataFrame in a new partition directory,</span>
<span class="c1">// adding a new column and dropping an existing column</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">cubesDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">createDataFrame</span><span class="o">(</span><span class="n">cubes</span><span class="o">,</span> <span class="n">Cube</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">cubesDF</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">parquet</span><span class="o">(</span><span class="s">&quot;data/test_table/key=2&quot;</span><span class="o">);</span>
<span class="c1">// Read the partitioned table</span>
<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Row</span><span class="o">&gt;</span> <span class="n">mergedDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">option</span><span class="o">(</span><span class="s">&quot;mergeSchema&quot;</span><span class="o">,</span> <span class="kc">true</span><span class="o">).</span><span class="na">parquet</span><span class="o">(</span><span class="s">&quot;data/test_table&quot;</span><span class="o">);</span>
<span class="n">mergedDF</span><span class="o">.</span><span class="na">printSchema</span><span class="o">();</span>
<span class="c1">// The final schema consists of all 3 columns in the Parquet files together</span>
<span class="c1">// with the partitioning column appeared in the partition directory paths</span>
<span class="c1">// root</span>
<span class="c1">// |-- value: int (nullable = true)</span>
<span class="c1">// |-- square: int (nullable = true)</span>
<span class="c1">// |-- cube: int (nullable = true)</span>
<span class="c1">// |-- key: int (nullable = true)</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.</small></div>
</div>
<div data-lang="python">
<div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">Row</span>
<span class="c1"># spark is from the previous example.</span>
<span class="c1"># Create a simple DataFrame, stored into a partition directory</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sparkContext</span>
<span class="n">squaresDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">6</span><span class="p">))</span>
<span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">single</span><span class="o">=</span><span class="n">i</span><span class="p">,</span> <span class="n">double</span><span class="o">=</span><span class="n">i</span> <span class="o">**</span> <span class="mi">2</span><span class="p">)))</span>
<span class="n">squaresDF</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s2">&quot;data/test_table/key=1&quot;</span><span class="p">)</span>
<span class="c1"># Create another DataFrame in a new partition directory,</span>
<span class="c1"># adding a new column and dropping an existing column</span>
<span class="n">cubesDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">6</span><span class="p">,</span> <span class="mi">11</span><span class="p">))</span>
<span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">:</span> <span class="n">Row</span><span class="p">(</span><span class="n">single</span><span class="o">=</span><span class="n">i</span><span class="p">,</span> <span class="n">triple</span><span class="o">=</span><span class="n">i</span> <span class="o">**</span> <span class="mi">3</span><span class="p">)))</span>
<span class="n">cubesDF</span><span class="o">.</span><span class="n">write</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s2">&quot;data/test_table/key=2&quot;</span><span class="p">)</span>
<span class="c1"># Read the partitioned table</span>
<span class="n">mergedDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">&quot;mergeSchema&quot;</span><span class="p">,</span> <span class="s2">&quot;true&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">parquet</span><span class="p">(</span><span class="s2">&quot;data/test_table&quot;</span><span class="p">)</span>
<span class="n">mergedDF</span><span class="o">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># The final schema consists of all 3 columns in the Parquet files together</span>
<span class="c1"># with the partitioning column appeared in the partition directory paths.</span>
<span class="c1"># root</span>
<span class="c1"># |-- double: long (nullable = true)</span>
<span class="c1"># |-- single: long (nullable = true)</span>
<span class="c1"># |-- triple: long (nullable = true)</span>
<span class="c1"># |-- key: integer (nullable = true)</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.</small></div>
</div>
<div data-lang="r">
<div class="highlight"><pre><span></span>df1 <span class="o">&lt;-</span> createDataFrame<span class="p">(</span><span class="kt">data.frame</span><span class="p">(</span>single<span class="o">=</span><span class="kt">c</span><span class="p">(</span><span class="m">12</span><span class="p">,</span> <span class="m">29</span><span class="p">),</span> <span class="kt">double</span><span class="o">=</span><span class="kt">c</span><span class="p">(</span><span class="m">19</span><span class="p">,</span> <span class="m">23</span><span class="p">)))</span>
df2 <span class="o">&lt;-</span> createDataFrame<span class="p">(</span><span class="kt">data.frame</span><span class="p">(</span><span class="kt">double</span><span class="o">=</span><span class="kt">c</span><span class="p">(</span><span class="m">19</span><span class="p">,</span> <span class="m">23</span><span class="p">),</span> triple<span class="o">=</span><span class="kt">c</span><span class="p">(</span><span class="m">23</span><span class="p">,</span> <span class="m">18</span><span class="p">)))</span>
<span class="c1"># Create a simple DataFrame, stored into a partition directory</span>
write.df<span class="p">(</span>df1<span class="p">,</span> <span class="s">&quot;data/test_table/key=1&quot;</span><span class="p">,</span> <span class="s">&quot;parquet&quot;</span><span class="p">,</span> <span class="s">&quot;overwrite&quot;</span><span class="p">)</span>
<span class="c1"># Create another DataFrame in a new partition directory,</span>
<span class="c1"># adding a new column and dropping an existing column</span>
write.df<span class="p">(</span>df2<span class="p">,</span> <span class="s">&quot;data/test_table/key=2&quot;</span><span class="p">,</span> <span class="s">&quot;parquet&quot;</span><span class="p">,</span> <span class="s">&quot;overwrite&quot;</span><span class="p">)</span>
<span class="c1"># Read the partitioned table</span>
df3 <span class="o">&lt;-</span> read.df<span class="p">(</span><span class="s">&quot;data/test_table&quot;</span><span class="p">,</span> <span class="s">&quot;parquet&quot;</span><span class="p">,</span> mergeSchema <span class="o">=</span> <span class="s">&quot;true&quot;</span><span class="p">)</span>
printSchema<span class="p">(</span>df3<span class="p">)</span>
<span class="c1"># The final schema consists of all 3 columns in the Parquet files together</span>
<span class="c1"># with the partitioning column appeared in the partition directory paths</span>
<span class="c1">## root</span>
<span class="c1">## |-- single: double (nullable = true)</span>
<span class="c1">## |-- double: double (nullable = true)</span>
<span class="c1">## |-- triple: double (nullable = true)</span>
<span class="c1">## |-- key: integer (nullable = true)</span>
</pre></div>
<div><small>Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.</small></div>
</div>
</div>
<h3 id="hive-metastore-parquet-table-conversion">Hive metastore Parquet table conversion</h3>
<p>When reading from and writing to Hive metastore Parquet tables, Spark SQL will try to use its own
Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the
<code>spark.sql.hive.convertMetastoreParquet</code> configuration, and is turned on by default.</p>
<h4 id="hiveparquet-schema-reconciliation">Hive/Parquet Schema Reconciliation</h4>
<p>There are two key differences between Hive and Parquet from the perspective of table schema
processing.</p>
<ol>
<li>Hive is case insensitive, while Parquet is not</li>
<li>Hive considers all columns nullable, while nullability in Parquet is significant</li>
</ol>
<p>Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:</p>
<ol>
<li>
<p>Fields that have the same name in both schema must have the same data type regardless of
nullability. The reconciled field should have the data type of the Parquet side, so that
nullability is respected.</p>
</li>
<li>
<p>The reconciled schema contains exactly those fields defined in Hive metastore schema.</p>
<ul>
<li>Any fields that only appear in the Parquet schema are dropped in the reconciled schema.</li>
<li>Any fields that only appear in the Hive metastore schema are added as nullable field in the
reconciled schema.</li>
</ul>
</li>
</ol>
<h4 id="metadata-refreshing">Metadata Refreshing</h4>
<p>Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
conversion is enabled, metadata of those converted tables are also cached. If these tables are
updated by Hive or other external tools, you need to refresh them manually to ensure consistent
metadata.</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// spark is an existing SparkSession</span>
<span class="n">spark</span><span class="o">.</span><span class="n">catalog</span><span class="o">.</span><span class="n">refreshTable</span><span class="o">(</span><span class="s">&quot;my_table&quot;</span><span class="o">)</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// spark is an existing SparkSession</span>
<span class="n">spark</span><span class="o">.</span><span class="na">catalog</span><span class="o">().</span><span class="na">refreshTable</span><span class="o">(</span><span class="s">&quot;my_table&quot;</span><span class="o">);</span></code></pre></figure>
</div>
<div data-lang="python">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># spark is an existing SparkSession</span>
<span class="n">spark</span><span class="o">.</span><span class="n">catalog</span><span class="o">.</span><span class="n">refreshTable</span><span class="p">(</span><span class="s2">&quot;my_table&quot;</span><span class="p">)</span></code></pre></figure>
</div>
<div data-lang="r">
<figure class="highlight"><pre><code class="language-r" data-lang="r"><span></span>refreshTable<span class="p">(</span><span class="s">&quot;my_table&quot;</span><span class="p">)</span></code></pre></figure>
</div>
<div data-lang="sql">
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span></span><span class="n">REFRESH</span> <span class="k">TABLE</span> <span class="n">my_table</span><span class="p">;</span></code></pre></figure>
</div>
</div>
<h3 id="configuration">Configuration</h3>
<p>Configuration of Parquet can be done using the <code>setConf</code> method on <code>SparkSession</code> or by running
<code>SET key=value</code> commands using SQL.</p>
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.sql.parquet.binaryAsString</code></td>
<td>false</td>
<td>
Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
not differentiate between binary data and strings when writing out the Parquet schema. This
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
<td>true</td>
<td>
Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.compression.codec</code></td>
<td>snappy</td>
<td>
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
Note that `zstd` requires `ZStandardCodec` to be installed before Hadoop 2.9.0, `brotli` requires
`BrotliCodec` to be installed.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.filterPushdown</code></td>
<td>true</td>
<td>Enables Parquet filter push-down optimization when set to true.</td>
</tr>
<tr>
<td><code>spark.sql.hive.convertMetastoreParquet</code></td>
<td>true</td>
<td>
When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.mergeSchema</code></td>
<td>false</td>
<td>
<p>
When true, the Parquet data source merges schemas collected from all data files, otherwise the
schema is picked from the summary file or a random data file if no summary file is available.
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.writeLegacyFormat</code></td>
<td>false</td>
<td>
If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values
will be written in Apache Parquet's fixed-length byte array format, which other systems such as
Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For
example, decimals will be written in int-based format. If Parquet output is intended for use
with systems that do not support this newer format, set to true.
</td>
</tr>
</table>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-1.8.0.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>