|  | --- | 
|  | title: "Configuration" | 
|  | url: spark-configuration | 
|  | aliases: | 
|  | - "spark/spark-configuration" | 
|  | menu: | 
|  | main: | 
|  | parent: Spark | 
|  | weight: 0 | 
|  | --- | 
|  | <!-- | 
|  | - Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | - contributor license agreements.  See the NOTICE file distributed with | 
|  | - this work for additional information regarding copyright ownership. | 
|  | - The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | - (the "License"); you may not use this file except in compliance with | 
|  | - the License.  You may obtain a copy of the License at | 
|  | - | 
|  | -   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | - | 
|  | - Unless required by applicable law or agreed to in writing, software | 
|  | - distributed under the License is distributed on an "AS IS" BASIS, | 
|  | - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | - See the License for the specific language governing permissions and | 
|  | - limitations under the License. | 
|  | --> | 
|  |  | 
|  | # Spark Configuration | 
|  |  | 
|  | ## Catalogs | 
|  |  | 
|  | Spark adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under `spark.sql.catalog`. | 
|  |  | 
|  | This creates an Iceberg catalog named `hive_prod` that loads tables from a Hive metastore: | 
|  |  | 
|  | ```plain | 
|  | spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog | 
|  | spark.sql.catalog.hive_prod.type = hive | 
|  | spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port | 
|  | # omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml | 
|  | ``` | 
|  |  | 
|  | Iceberg also supports a directory-based catalog in HDFS that can be configured using `type=hadoop`: | 
|  |  | 
|  | ```plain | 
|  | spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog | 
|  | spark.sql.catalog.hadoop_prod.type = hadoop | 
|  | spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path | 
|  | ``` | 
|  |  | 
|  | {{< hint info >}} | 
|  | The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a [session catalog](#replacing-the-session-catalog). | 
|  | {{< /hint >}} | 
|  |  | 
|  | ### Catalog configuration | 
|  |  | 
|  | A catalog is created and named by adding a property `spark.sql.catalog.(catalog-name)` with an implementation class for its value. | 
|  |  | 
|  | Iceberg supplies two implementations: | 
|  |  | 
|  | * `org.apache.iceberg.spark.SparkCatalog` supports a Hive Metastore or a Hadoop warehouse as a catalog | 
|  | * `org.apache.iceberg.spark.SparkSessionCatalog` adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables | 
|  |  | 
|  | Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are: | 
|  |  | 
|  | | Property                                           | Values                        | Description                                                          | | 
|  | | -------------------------------------------------- | ----------------------------- | -------------------------------------------------------------------- | | 
|  | | spark.sql.catalog._catalog-name_.type              | `hive` or `hadoop`            | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog` or left unset if using a custom catalog | | 
|  | | spark.sql.catalog._catalog-name_.catalog-impl      |                               | The underlying Iceberg catalog implementation.| | 
|  | | spark.sql.catalog._catalog-name_.default-namespace | default                       | The default current namespace for the catalog | | 
|  | | spark.sql.catalog._catalog-name_.uri               | thrift://host:port            | Metastore connect URI; default from `hive-site.xml` | | 
|  | | spark.sql.catalog._catalog-name_.warehouse         | hdfs://nn:8020/warehouse/path | Base path for the warehouse directory | | 
|  | | spark.sql.catalog._catalog-name_.cache-enabled     | `true` or `false`             | Whether to enable catalog cache, default value is `true` | | 
|  | | spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30 seconds) | Duration after which cached catalog entries are expired; Only effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0` disables caching entirely, irrespective of `cache-enabled`. Default is `30000` (30 seconds) |                                                   | | 
|  |  | 
|  | Additional properties can be found in common [catalog configuration](../configuration#catalog-properties). | 
|  |  | 
|  |  | 
|  | ### Using catalogs | 
|  |  | 
|  | Catalog names are used in SQL queries to identify a table. In the examples above, `hive_prod` and `hadoop_prod` can be used to prefix database and table names that will be loaded from those catalogs. | 
|  |  | 
|  | ```sql | 
|  | SELECT * FROM hive_prod.db.table -- load db.table from catalog hive_prod | 
|  | ``` | 
|  |  | 
|  | Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names. | 
|  |  | 
|  | ```sql | 
|  | USE hive_prod.db; | 
|  | SELECT * FROM table -- load db.table from catalog hive_prod | 
|  | ``` | 
|  |  | 
|  | To see the current catalog and namespace, run `SHOW CURRENT NAMESPACE`. | 
|  |  | 
|  | ### Replacing the session catalog | 
|  |  | 
|  | To add Iceberg table support to Spark's built-in catalog, configure `spark_catalog` to use Iceberg's `SparkSessionCatalog`. | 
|  |  | 
|  | ```plain | 
|  | spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog | 
|  | spark.sql.catalog.spark_catalog.type = hive | 
|  | ``` | 
|  |  | 
|  | Spark's built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg's `SparkSessionCatalog` as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead. | 
|  |  | 
|  | This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables. | 
|  |  | 
|  | ### Using catalog specific Hadoop configuration values | 
|  |  | 
|  | Similar to configuring Hadoop properties by using `spark.hadoop.*`, it's possible to set per-catalog Hadoop configuration values when using Spark by adding the property for the catalog with the prefix `spark.sql.catalog.(catalog-name).hadoop.*`. These properties will take precedence over values configured globally using `spark.hadoop.*` and will only affect Iceberg tables. | 
|  |  | 
|  | ```plain | 
|  | spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000 | 
|  | ``` | 
|  |  | 
|  | ### Loading a custom catalog | 
|  |  | 
|  | Spark supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property. Here is an example: | 
|  |  | 
|  | ```plain | 
|  | spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog | 
|  | spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl | 
|  | spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value | 
|  | ``` | 
|  |  | 
|  | ### Catalogs in Spark 2.4 | 
|  |  | 
|  | When using Iceberg 0.11.0 and later, Spark 2.4 can load tables from multiple Iceberg catalogs or from table locations. | 
|  |  | 
|  | Catalogs in 2.4 are configured just like catalogs in 3.x, but only Iceberg catalogs are supported. | 
|  |  | 
|  |  | 
|  | ## SQL Extensions | 
|  |  | 
|  | Iceberg 0.11.0 and later add an extension module to Spark to add new SQL commands, like `CALL` for stored procedures or `ALTER TABLE ... WRITE ORDERED BY`. | 
|  |  | 
|  | Using those SQL commands requires adding Iceberg extensions to your Spark environment using the following Spark property: | 
|  |  | 
|  |  | 
|  | | Spark extensions property | Iceberg extensions implementation                                   | | 
|  | |---------------------------|---------------------------------------------------------------------| | 
|  | | `spark.sql.extensions`    | `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions` | | 
|  |  | 
|  | SQL extensions are not available for Spark 2.4. | 
|  |  | 
|  |  | 
|  | ## Runtime configuration | 
|  |  | 
|  | ### Read options | 
|  |  | 
|  | Spark read options are passed when configuring the DataFrameReader, like this: | 
|  |  | 
|  | ```scala | 
|  | // time travel | 
|  | spark.read | 
|  | .option("snapshot-id", 10963874102873L) | 
|  | .table("catalog.db.table") | 
|  | ``` | 
|  |  | 
|  | | Spark option    | Default               | Description                                                                               | | 
|  | | --------------- | --------------------- | ----------------------------------------------------------------------------------------- | | 
|  | | snapshot-id     | (latest)              | Snapshot ID of the table snapshot to read                                                 | | 
|  | | as-of-timestamp | (latest)              | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. | | 
|  | | split-size      | As per table property | Overrides this table's read.split.target-size and read.split.metadata-target-size         | | 
|  | | lookback        | As per table property | Overrides this table's read.split.planning-lookback                                       | | 
|  | | file-open-cost  | As per table property | Overrides this table's read.split.open-file-cost                                          | | 
|  | | vectorization-enabled  | As per table property | Overrides this table's read.parquet.vectorization.enabled                                          | | 
|  | | batch-size  | As per table property | Overrides this table's read.parquet.vectorization.batch-size                                          | | 
|  | | stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used | | 
|  |  | 
|  | ### Write options | 
|  |  | 
|  | Spark write options are passed when configuring the DataFrameWriter, like this: | 
|  |  | 
|  | ```scala | 
|  | // write with Avro instead of Parquet | 
|  | df.write | 
|  | .option("write-format", "avro") | 
|  | .option("snapshot-property.key", "value") | 
|  | .insertInto("catalog.db.table") | 
|  | ``` | 
|  |  | 
|  | | Spark option           | Default                    | Description                                                  | | 
|  | | ---------------------- | -------------------------- | ------------------------------------------------------------ | | 
|  | | write-format           | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | | 
|  | | target-file-size-bytes | As per table property      | Overrides this table's write.target-file-size-bytes          | | 
|  | | check-nullability      | true                       | Sets the nullable check on fields                            | | 
|  | | snapshot-property._custom-key_    | null            | Adds an entry with custom-key and corresponding value in the snapshot summary  | | 
|  | | fanout-enabled       | false        | Overrides this table's write.spark.fanout.enabled  | | 
|  | | check-ordering       | true        | Checks if input schema and table schema are same  | | 
|  | | isolation-level | null | Desired isolation level for Dataframe overwrite operations.  `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. | | 
|  | | validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. | |