The latest version of Iceberg is 0.11.0.
To use Iceberg in a Spark shell, use the --packages
option:
spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.11.0
!!! Note If you want to include Iceberg in your Spark installation, add the iceberg-spark3-runtime
Jar to Spark's jars
folder.
Iceberg comes with catalogs that enable SQL commands to manage tables and load them by name. Catalogs are configured using properties under spark.sql.catalog.(catalog_name)
.
This command creates a path-based catalog named local
for tables under $PWD/warehouse
and adds support for Iceberg tables to Spark's built-in catalog:
spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.11.0 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.local.type=hadoop \ --conf spark.sql.catalog.local.warehouse=$PWD/warehouse
To create your first Iceberg table in Spark, use the spark-sql
shell or spark.sql(...)
to run a CREATE TABLE
command:
-- local is the path-based catalog defined above CREATE TABLE local.db.table (id bigint, data string) USING iceberg
Iceberg catalogs support the full range of SQL DDL commands, including:
Once your table is created, insert data using INSERT INTO
:
INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c'); INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;
Iceberg also adds row-level SQL updates to Spark, MERGE INTO
and DELETE FROM
:
MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count WHEN NOT MATCHED THEN INSERT *
Iceberg supports writing DataFrames using the new v2 DataFrame write API:
spark.table("source").select("id", "data") .writeTo("local.db.table").append()
The old write
API is supported, but not recommended.
To read with SQL, use the an Iceberg table name in a SELECT
query:
SELECT count(1) as count, data FROM local.db.table GROUP BY data
SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots
metadata table:
SELECT * FROM local.db.table.snapshots
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+ | committed_at | snapshot_id | parent_id | operation | manifest_list | ... | +-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+ | 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | ... | | | | | | | ... | | | | | | | ... | | ... | ... | ... | ... | ... | ... | +-------------------------+----------------+-----------+-----------+----------------------------------------------------+-----+
DataFrame reads are supported and can now reference tables by name using spark.table
:
val df = spark.table("local.db.table") df.count()
Next, you can learn more about Iceberg tables in Spark:
CREATE
, ALTER
, and DROP
SELECT
queries and metadata tablesINSERT INTO
and MERGE INTO