Spark adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under spark.sql.catalog
.
This creates an Iceberg catalog named hive_prod
that loads tables from a Hive metastore:
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type = hive spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port # omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
Below is an example for a REST catalog named rest_prod
that loads tables from REST URL http://localhost:8080
:
spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.rest_prod.type = rest spark.sql.catalog.rest_prod.uri = http://localhost:8080
Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop
:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type = hadoop spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
{{< hint info >}} The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a session catalog. {{< /hint >}}
A catalog is created and named by adding a property spark.sql.catalog.(catalog-name)
with an implementation class for its value.
Iceberg supplies two implementations:
org.apache.iceberg.spark.SparkCatalog
supports a Hive Metastore or a Hadoop warehouse as a catalogorg.apache.iceberg.spark.SparkSessionCatalog
adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tablesBoth catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are:
Property | Values | Description |
---|---|---|
spark.sql.catalog.catalog-name.type | hive , hadoop or rest | The underlying Iceberg catalog implementation, HiveCatalog , HadoopCatalog , RESTCatalog or left unset if using a custom catalog |
spark.sql.catalog.catalog-name.catalog-impl | The custom Iceberg catalog implementation. If type is null, catalog-impl must not be null. | |
spark.sql.catalog.catalog-name.io-impl | The custom FileIO implementation. | |
spark.sql.catalog.catalog-name.metrics-reporter-impl | The custom MetricsReporter implementation. | |
spark.sql.catalog.catalog-name.default-namespace | default | The default current namespace for the catalog |
spark.sql.catalog.catalog-name.uri | thrift://host:port | Hive metastore URL for hive typed catalog, REST URL for REST typed catalog |
spark.sql.catalog.catalog-name.warehouse | hdfs://nn:8020/warehouse/path | Base path for the warehouse directory |
spark.sql.catalog.catalog-name.cache-enabled | true or false | Whether to enable catalog cache, default value is true |
spark.sql.catalog.catalog-name.cache.expiration-interval-ms | 30000 (30 seconds) | Duration after which cached catalog entries are expired; Only effective if cache-enabled is true . -1 disables cache expiration and 0 disables caching entirely, irrespective of cache-enabled . Default is 30000 (30 seconds) |
spark.sql.catalog.catalog-name.table-default.propertyKey | Default Iceberg table property value for property key propertyKey, which will be set on tables created by this catalog if not overridden | |
spark.sql.catalog.catalog-name.table-override.propertyKey | Enforced Iceberg table property value for property key propertyKey, which cannot be overridden by user |
Additional properties can be found in common catalog configuration.
Catalog names are used in SQL queries to identify a table. In the examples above, hive_prod
and hadoop_prod
can be used to prefix database and table names that will be loaded from those catalogs.
SELECT * FROM hive_prod.db.table -- load db.table from catalog hive_prod
Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.
USE hive_prod.db; SELECT * FROM table -- load db.table from catalog hive_prod
To see the current catalog and namespace, run SHOW CURRENT NAMESPACE
.
To add Iceberg table support to Spark‘s built-in catalog, configure spark_catalog
to use Iceberg’s SparkSessionCatalog
.
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type = hive
Spark‘s built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg’s SparkSessionCatalog
as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.
This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.
Similar to configuring Hadoop properties by using spark.hadoop.*
, it's possible to set per-catalog Hadoop configuration values when using Spark by adding the property for the catalog with the prefix spark.sql.catalog.(catalog-name).hadoop.*
. These properties will take precedence over values configured globally using spark.hadoop.*
and will only affect Iceberg tables.
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000
Spark supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property. Here is an example:
spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value
Iceberg 0.11.0 and later add an extension module to Spark to add new SQL commands, like CALL
for stored procedures or ALTER TABLE ... WRITE ORDERED BY
.
Using those SQL commands requires adding Iceberg extensions to your Spark environment using the following Spark property:
Spark extensions property | Iceberg extensions implementation |
---|---|
spark.sql.extensions | org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
Spark read options are passed when configuring the DataFrameReader, like this:
// time travel spark.read .option("snapshot-id", 10963874102873L) .table("catalog.db.table")
Spark option | Default | Description |
---|---|---|
snapshot-id | (latest) | Snapshot ID of the table snapshot to read |
as-of-timestamp | (latest) | A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. |
split-size | As per table property | Overrides this table's read.split.target-size and read.split.metadata-target-size |
lookback | As per table property | Overrides this table's read.split.planning-lookback |
file-open-cost | As per table property | Overrides this table's read.split.open-file-cost |
vectorization-enabled | As per table property | Overrides this table's read.parquet.vectorization.enabled |
batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size |
stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
Spark write options are passed when configuring the DataFrameWriter, like this:
// write with Avro instead of Parquet df.write .option("write-format", "avro") .option("snapshot-property.key", "value") .insertInto("catalog.db.table")
Spark option | Default | Description |
---|---|---|
write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
check-nullability | true | Sets the nullable check on fields |
snapshot-property.custom-key | null | Adds an entry with custom-key and corresponding value in the snapshot summary |
fanout-enabled | false | Overrides this table's write.spark.fanout.enabled |
check-ordering | true | Checks if input schema and table schema are same |
isolation-level | null | Desired isolation level for Dataframe overwrite operations. null => no checks (for idempotent writes), serializable => check for concurrent inserts or deletes in destination partitions, snapshot => checks for concurrent deletes in destination partitions. |
validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via Table API or Snapshots table. If null, the table's oldest known snapshot is used. |