Getting Started

Using Iceberg in Spark 3

The latest version of Iceberg is 0.10.0.

To use Iceberg in a Spark shell, use the --packages option:

spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0

!!! Note If you want to include Iceberg in your Spark installation, add the iceberg-spark3-runtime Jar to Spark's jars folder.

Adding catalogs

Iceberg comes with catalogs that enable SQL commands to manage tables and load them by name. Catalogs are configured using properties under spark.sql.catalog.(catalog_name).

This command creates a path-based catalog named local for tables under $PWD/warehouse and adds support for Iceberg tables to Spark's built-in catalog:

spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse

Creating a table

To create your first Iceberg table in Spark, use the spark-sql shell or spark.sql(...) to run a CREATE TABLE command:

-- local is the path-based catalog defined above
CREATE TABLE local.db.table (id bigint, data string) USING iceberg

Iceberg catalogs support the full range of SQL DDL commands, including:

Writing

Once your table is created, insert data using INSERT INTO:

INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;

Iceberg supports writing DataFrames using the new v2 DataFrame write API:

spark.table("source").select("id", "data")
     .writeTo("local.db.table").append()

The old write API is supported, but not recommended.

Reading

To read with SQL, use the an Iceberg table name in a SELECT query:

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data

SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:

SELECT * FROM local.db.table.snapshots
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+
| committed_at            | snapshot_id    | parent_id | operation | manifest_list                                      | ... |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+
| 2019-02-08 03:29:51.215 | 57897183625154 | null      | append    | s3://.../table/metadata/snap-57897183625154-1.avro | ... |
|                         |                |           |           |                                                    | ... |
|                         |                |           |           |                                                    | ... |
| ...                     | ...            | ...       | ...       | ...                                                | ... |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+

DataFrame reads are supported and can now reference tables by name using spark.table:

val df = spark.table("local.db.table")
df.count()

Next steps

Next, you can learn more about Iceberg tables in Spark, or about the Iceberg Table API.