| --- |
| layout: global |
| title: Parquet Files |
| displayTitle: Parquet Files |
| license: | |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| --- |
| |
| * Table of contents |
| {:toc} |
| |
| [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. |
| Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema |
| of the original data. When reading Parquet files, all columns are automatically converted to be nullable for |
| compatibility reasons. |
| |
| ### Loading Data Programmatically |
| |
| Using the data from the above example: |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala" markdown="1"> |
| {% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} |
| </div> |
| |
| <div data-lang="java" markdown="1"> |
| {% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} |
| </div> |
| |
| <div data-lang="python" markdown="1"> |
| |
| {% include_example basic_parquet_example python/sql/datasource.py %} |
| </div> |
| |
| <div data-lang="r" markdown="1"> |
| |
| {% include_example basic_parquet_example r/RSparkSQLExample.R %} |
| |
| </div> |
| |
| <div data-lang="SQL" markdown="1"> |
| |
| {% highlight sql %} |
| |
| CREATE TEMPORARY VIEW parquetTable |
| USING org.apache.spark.sql.parquet |
| OPTIONS ( |
| path "examples/src/main/resources/people.parquet" |
| ) |
| |
| SELECT * FROM parquetTable |
| |
| {% endhighlight %} |
| |
| </div> |
| |
| </div> |
| |
| ### Partition Discovery |
| |
| Table partitioning is a common optimization approach used in systems like Hive. In a partitioned |
| table, data are usually stored in different directories, with partitioning column values encoded in |
| the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet) |
| are able to discover and infer partitioning information automatically. |
| For example, we can store all our previously used |
| population data into a partitioned table using the following directory structure, with two extra |
| columns, `gender` and `country` as partitioning columns: |
| |
| {% highlight text %} |
| |
| path |
| └── to |
| └── table |
| ├── gender=male |
| │ ├── ... |
| │ │ |
| │ ├── country=US |
| │ │ └── data.parquet |
| │ ├── country=CN |
| │ │ └── data.parquet |
| │ └── ... |
| └── gender=female |
| ├── ... |
| │ |
| ├── country=US |
| │ └── data.parquet |
| ├── country=CN |
| │ └── data.parquet |
| └── ... |
| |
| {% endhighlight %} |
| |
| By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL |
| will automatically extract the partitioning information from the paths. |
| Now the schema of the returned DataFrame becomes: |
| |
| {% highlight text %} |
| |
| root |
| |-- name: string (nullable = true) |
| |-- age: long (nullable = true) |
| |-- gender: string (nullable = true) |
| |-- country: string (nullable = true) |
| |
| {% endhighlight %} |
| |
| Notice that the data types of the partitioning columns are automatically inferred. Currently, |
| numeric data types, date, timestamp and string type are supported. Sometimes users may not want |
| to automatically infer the data types of the partitioning columns. For these use cases, the |
| automatic type inference can be configured by |
| `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type |
| inference is disabled, string type will be used for the partitioning columns. |
| |
| Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths |
| by default. For the above example, if users pass `path/to/table/gender=male` to either |
| `SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a |
| partitioning column. If users need to specify the base path that partition discovery |
| should start with, they can set `basePath` in the data source options. For example, |
| when `path/to/table/gender=male` is the path of the data and |
| users set `basePath` to `path/to/table/`, `gender` will be a partitioning column. |
| |
| ### Schema Merging |
| |
| Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with |
| a simple schema, and gradually add more columns to the schema as needed. In this way, users may end |
| up with multiple Parquet files with different but mutually compatible schemas. The Parquet data |
| source is now able to automatically detect this case and merge schemas of all these files. |
| |
| Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we |
| turned it off by default starting from 1.5.0. You may enable it by |
| |
| 1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the |
| examples below), or |
| 2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`. |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala" markdown="1"> |
| {% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} |
| </div> |
| |
| <div data-lang="java" markdown="1"> |
| {% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} |
| </div> |
| |
| <div data-lang="python" markdown="1"> |
| |
| {% include_example schema_merging python/sql/datasource.py %} |
| </div> |
| |
| <div data-lang="r" markdown="1"> |
| |
| {% include_example schema_merging r/RSparkSQLExample.R %} |
| |
| </div> |
| |
| </div> |
| |
| ### Hive metastore Parquet table conversion |
| |
| When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore |
| Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for |
| better performance. This behavior is controlled by the `spark.sql.hive.convertMetastoreParquet` |
| configuration, and is turned on by default. |
| |
| #### Hive/Parquet Schema Reconciliation |
| |
| There are two key differences between Hive and Parquet from the perspective of table schema |
| processing. |
| |
| 1. Hive is case insensitive, while Parquet is not |
| 1. Hive considers all columns nullable, while nullability in Parquet is significant |
| |
| Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a |
| Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are: |
| |
| 1. Fields that have the same name in both schema must have the same data type regardless of |
| nullability. The reconciled field should have the data type of the Parquet side, so that |
| nullability is respected. |
| |
| 1. The reconciled schema contains exactly those fields defined in Hive metastore schema. |
| |
| - Any fields that only appear in the Parquet schema are dropped in the reconciled schema. |
| - Any fields that only appear in the Hive metastore schema are added as nullable field in the |
| reconciled schema. |
| |
| #### Metadata Refreshing |
| |
| Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table |
| conversion is enabled, metadata of those converted tables are also cached. If these tables are |
| updated by Hive or other external tools, you need to refresh them manually to ensure consistent |
| metadata. |
| |
| <div class="codetabs"> |
| |
| <div data-lang="scala" markdown="1"> |
| |
| {% highlight scala %} |
| // spark is an existing SparkSession |
| spark.catalog.refreshTable("my_table") |
| {% endhighlight %} |
| |
| </div> |
| |
| <div data-lang="java" markdown="1"> |
| |
| {% highlight java %} |
| // spark is an existing SparkSession |
| spark.catalog().refreshTable("my_table"); |
| {% endhighlight %} |
| |
| </div> |
| |
| <div data-lang="python" markdown="1"> |
| |
| {% highlight python %} |
| # spark is an existing SparkSession |
| spark.catalog.refreshTable("my_table") |
| {% endhighlight %} |
| |
| </div> |
| |
| <div data-lang="r" markdown="1"> |
| |
| {% highlight r %} |
| refreshTable("my_table") |
| {% endhighlight %} |
| |
| </div> |
| |
| <div data-lang="SQL" markdown="1"> |
| |
| {% highlight sql %} |
| REFRESH TABLE my_table; |
| {% endhighlight %} |
| |
| </div> |
| |
| </div> |
| |
| ### Configuration |
| |
| Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running |
| `SET key=value` commands using SQL. |
| |
| <table class="table"> |
| <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr> |
| <tr> |
| <td><code>spark.sql.parquet.binaryAsString</code></td> |
| <td>false</td> |
| <td> |
| Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do |
| not differentiate between binary data and strings when writing out the Parquet schema. This |
| flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. |
| </td> |
| <td>1.1.1</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.int96AsTimestamp</code></td> |
| <td>true</td> |
| <td> |
| Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This |
| flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. |
| </td> |
| <td>1.3.0</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.compression.codec</code></td> |
| <td>snappy</td> |
| <td> |
| Sets the compression codec used when writing Parquet files. If either <code>compression</code> or |
| <code>parquet.compression</code> is specified in the table-specific options/properties, the precedence would be |
| <code>compression</code>, <code>parquet.compression</code>, <code>spark.sql.parquet.compression.codec</code>. Acceptable values include: |
| none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd. |
| Note that <code>zstd</code> requires <code>ZStandardCodec</code> to be installed before Hadoop 2.9.0, <code>brotli</code> requires |
| <code>BrotliCodec</code> to be installed. |
| </td> |
| <td>1.1.1</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.filterPushdown</code></td> |
| <td>true</td> |
| <td>Enables Parquet filter push-down optimization when set to true.</td> |
| <td>1.2.0</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.hive.convertMetastoreParquet</code></td> |
| <td>true</td> |
| <td> |
| When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in |
| support. |
| </td> |
| <td>1.1.1</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.mergeSchema</code></td> |
| <td>false</td> |
| <td> |
| <p> |
| When true, the Parquet data source merges schemas collected from all data files, otherwise the |
| schema is picked from the summary file or a random data file if no summary file is available. |
| </p> |
| </td> |
| <td>1.5.0</td> |
| </tr> |
| <tr> |
| <td><code>spark.sql.parquet.writeLegacyFormat</code></td> |
| <td>false</td> |
| <td> |
| If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values |
| will be written in Apache Parquet's fixed-length byte array format, which other systems such as |
| Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For |
| example, decimals will be written in int-based format. If Parquet output is intended for use |
| with systems that do not support this newer format, set to true. |
| </td> |
| <td>1.6.0</td> |
| </tr> |
| </table> |