blob: d2267d821865f7d6c160986a7eba39cc24cb0b1c [file] [log] [blame] [view]
---
title: "Configuration"
url: spark-configuration
aliases:
- "spark/spark-configuration"
menu:
main:
parent: Spark
weight: 0
---
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->
# Spark Configuration
## Catalogs
Spark adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under `spark.sql.catalog`.
This creates an Iceberg catalog named `hive_prod` that loads tables from a Hive metastore:
```plain
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
```
Iceberg also supports a directory-based catalog in HDFS that can be configured using `type=hadoop`:
```plain
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
```
{{< hint info >}}
The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a [session catalog](#replacing-the-session-catalog).
{{< /hint >}}
### Catalog configuration
A catalog is created and named by adding a property `spark.sql.catalog.(catalog-name)` with an implementation class for its value.
Iceberg supplies two implementations:
* `org.apache.iceberg.spark.SparkCatalog` supports a Hive Metastore or a Hadoop warehouse as a catalog
* `org.apache.iceberg.spark.SparkSessionCatalog` adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables
Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are:
| Property | Values | Description |
| -------------------------------------------------- | ----------------------------- | -------------------------------------------------------------------- |
| spark.sql.catalog._catalog-name_.type | `hive` or `hadoop` | The underlying Iceberg catalog implementation, `HiveCatalog`, `HadoopCatalog` or left unset if using a custom catalog |
| spark.sql.catalog._catalog-name_.catalog-impl | | The underlying Iceberg catalog implementation.|
| spark.sql.catalog._catalog-name_.default-namespace | default | The default current namespace for the catalog |
| spark.sql.catalog._catalog-name_.uri | thrift://host:port | Metastore connect URI; default from `hive-site.xml` |
| spark.sql.catalog._catalog-name_.warehouse | hdfs://nn:8020/warehouse/path | Base path for the warehouse directory |
| spark.sql.catalog._catalog-name_.cache-enabled | `true` or `false` | Whether to enable catalog cache, default value is `true` |
| spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30 seconds) | Duration after which cached catalog entries are expired; Only effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0` disables caching entirely, irrespective of `cache-enabled`. Default is `30000` (30 seconds) | |
Additional properties can be found in common [catalog configuration](../configuration#catalog-properties).
### Using catalogs
Catalog names are used in SQL queries to identify a table. In the examples above, `hive_prod` and `hadoop_prod` can be used to prefix database and table names that will be loaded from those catalogs.
```sql
SELECT * FROM hive_prod.db.table -- load db.table from catalog hive_prod
```
Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.
```sql
USE hive_prod.db;
SELECT * FROM table -- load db.table from catalog hive_prod
```
To see the current catalog and namespace, run `SHOW CURRENT NAMESPACE`.
### Replacing the session catalog
To add Iceberg table support to Spark's built-in catalog, configure `spark_catalog` to use Iceberg's `SparkSessionCatalog`.
```plain
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive
```
Spark's built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg's `SparkSessionCatalog` as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.
This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.
### Using catalog specific Hadoop configuration values
Similar to configuring Hadoop properties by using `spark.hadoop.*`, it's possible to set per-catalog Hadoop configuration values when using Spark by adding the property for the catalog with the prefix `spark.sql.catalog.(catalog-name).hadoop.*`. These properties will take precedence over values configured globally using `spark.hadoop.*` and will only affect Iceberg tables.
```plain
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000
```
### Loading a custom catalog
Spark supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property. Here is an example:
```plain
spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value
```
### Catalogs in Spark 2.4
When using Iceberg 0.11.0 and later, Spark 2.4 can load tables from multiple Iceberg catalogs or from table locations.
Catalogs in 2.4 are configured just like catalogs in 3.x, but only Iceberg catalogs are supported.
## SQL Extensions
Iceberg 0.11.0 and later add an extension module to Spark to add new SQL commands, like `CALL` for stored procedures or `ALTER TABLE ... WRITE ORDERED BY`.
Using those SQL commands requires adding Iceberg extensions to your Spark environment using the following Spark property:
| Spark extensions property | Iceberg extensions implementation |
|---------------------------|---------------------------------------------------------------------|
| `spark.sql.extensions` | `org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions` |
SQL extensions are not available for Spark 2.4.
## Runtime configuration
### Read options
Spark read options are passed when configuring the DataFrameReader, like this:
```scala
// time travel
spark.read
.option("snapshot-id", 10963874102873L)
.table("catalog.db.table")
```
| Spark option | Default | Description |
| --------------- | --------------------- | ----------------------------------------------------------------------------------------- |
| snapshot-id | (latest) | Snapshot ID of the table snapshot to read |
| as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
| split-size | As per table property | Overrides this table's read.split.target-size and read.split.metadata-target-size |
| lookback | As per table property | Overrides this table's read.split.planning-lookback |
| file-open-cost | As per table property | Overrides this table's read.split.open-file-cost |
| vectorization-enabled | As per table property | Overrides this table's read.parquet.vectorization.enabled |
| batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size |
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
### Write options
Spark write options are passed when configuring the DataFrameWriter, like this:
```scala
// write with Avro instead of Parquet
df.write
.option("write-format", "avro")
.option("snapshot-property.key", "value")
.insertInto("catalog.db.table")
```
| Spark option | Default | Description |
| ---------------------- | -------------------------- | ------------------------------------------------------------ |
| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
| check-nullability | true | Sets the nullable check on fields |
| snapshot-property._custom-key_ | null | Adds an entry with custom-key and corresponding value in the snapshot summary |
| fanout-enabled | false | Overrides this table's write.spark.fanout.enabled |
| check-ordering | true | Checks if input schema and table schema are same |
| isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
| validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../../api#table-metadata) or [Snapshots table](../spark-queries#snapshots). If null, the table's oldest known snapshot is used. |