layout: global title: Parquet Files displayTitle: Parquet Files license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

  • Table of contents {:toc}

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

Loading Data Programmatically

Using the data from the above example:

{% include_example basic_parquet_example python/sql/datasource.py %}

{% include_example basic_parquet_example r/RSparkSQLExample.R %}

{% highlight sql %}

CREATE TEMPORARY VIEW parquetTable USING org.apache.spark.sql.parquet OPTIONS ( path “examples/src/main/resources/people.parquet” )

SELECT * FROM parquetTable

{% endhighlight %}

Partition Discovery

Table partitioning is a common optimization approach used in systems like Hive. In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able to discover and infer partitioning information automatically. For example, we can store all our previously used population data into a partitioned table using the following directory structure, with two extra columns, gender and country as partitioning columns:

{% highlight text %}

path └── to └── table ├── gender=male │   ├── ... │   │ │   ├── country=US │   │   └── data.parquet │   ├── country=CN │   │   └── data.parquet │   └── ... └── gender=female    ├── ...    │    ├── country=US    │   └── data.parquet    ├── country=CN    │   └── data.parquet    └── ...

{% endhighlight %}

By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes:

{% highlight text %}

root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)

{% endhighlight %}

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types, date, timestamp and string type are supported. Sometimes users may not want to automatically infer the data types of the partitioning columns. For these use cases, the automatic type inference can be configured by spark.sql.sources.partitionColumnTypeInference.enabled, which is default to true. When type inference is disabled, string type will be used for the partitioning columns.

Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths by default. For the above example, if users pass path/to/table/gender=male to either SparkSession.read.parquet or SparkSession.read.load, gender will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set basePath in the data source options. For example, when path/to/table/gender=male is the path of the data and users set basePath to path/to/table/, gender will be a partitioning column.

Schema Merging

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by

  1. setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true.

{% include_example schema_merging python/sql/datasource.py %}

{% include_example schema_merging r/RSparkSQLExample.R %}

Hive metastore Parquet table conversion

When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for better performance. This behavior is controlled by the spark.sql.hive.convertMetastoreParquet configuration, and is turned on by default.

Hive/Parquet Schema Reconciliation

There are two key differences between Hive and Parquet from the perspective of table schema processing.

  1. Hive is case insensitive, while Parquet is not
  2. Hive considers all columns nullable, while nullability in Parquet is significant

Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:

  1. Fields that have the same name in both schema must have the same data type regardless of nullability. The reconciled field should have the data type of the Parquet side, so that nullability is respected.

  2. The reconciled schema contains exactly those fields defined in Hive metastore schema.

    • Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
    • Any fields that only appear in the Hive metastore schema are added as nullable field in the reconciled schema.

Metadata Refreshing

Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table conversion is enabled, metadata of those converted tables are also cached. If these tables are updated by Hive or other external tools, you need to refresh them manually to ensure consistent metadata.

{% highlight scala %} // spark is an existing SparkSession spark.catalog.refreshTable(“my_table”) {% endhighlight %}

{% highlight java %} // spark is an existing SparkSession spark.catalog().refreshTable(“my_table”); {% endhighlight %}

{% highlight python %}

spark is an existing SparkSession

spark.catalog.refreshTable(“my_table”) {% endhighlight %}

{% highlight r %} refreshTable(“my_table”) {% endhighlight %}

{% highlight sql %} REFRESH TABLE my_table; {% endhighlight %}

Data Source Option

Data source options of Parquet can be set via:

  • the .option/.options methods of DataFrameReader or DataFrameWriter
  • the .option/.options methods of DataStreamReader or DataStreamWriter

Configuration

Configuration of Parquet can be done using the setConf method on SparkSession or by running SET key=value commands using SQL.