Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions.
Feature support | Spark 3.0 | Spark 2.4 | Notes |
---|---|---|---|
SQL create table | ✔️ | ||
SQL create table as | ✔️ | ||
SQL replace table as | ✔️ | ||
SQL alter table | ✔️ | ⚠ requires extensions enabled to update partition field and sort order | |
SQL drop table | ✔️ | ||
SQL select | ✔️ | ||
SQL insert into | ✔️ | ||
SQL insert overwrite | ✔️ | ||
DataFrame reads | ✔️ | ✔️ | |
DataFrame append | ✔️ | ✔️ | |
DataFrame overwrite | ✔️ | ✔️ | ⚠ Behavior changed in Spark 3.0 |
DataFrame CTAS and RTAS | ✔️ | ||
Metadata tables | ✔️ | ✔️ |
To enable Iceberg SQL extensions, set Spark configuration spark.sql.extensions
as org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
.
Spark 3.0 adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under spark.sql.catalog
.
This creates an Iceberg catalog named hive_prod
that loads tables from a Hive metastore:
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type = hive spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port # omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop
:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type = hadoop spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
!!! Note The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a session catalog.
Catalog names are used in SQL queries to identify a table. In the examples above, hive_prod
and hadoop_prod
can be used to prefix database and table names that will be loaded from those catalogs.
SELECT * FROM hive_prod.db.table -- load db.table from catalog hive_prod
Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.
USE hive_prod.db; SELECT * FROM table -- load db.table from catalog hive_prod
To see the current catalog and namespace, run SHOW CURRENT NAMESPACE
.
To add Iceberg table support to Spark‘s built-in catalog, configure spark_catalog
to use Iceberg’s SparkSessionCatalog
.
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type = hive
Spark‘s built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg’s SparkSessionCatalog
as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.
This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.
Spark supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property. When catalog-impl
is set, the value of type
is ignored. Here is an example:
spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value
!!! Note Spark 2.4 can't create Iceberg tables with DDL, instead use the Iceberg API.
CREATE TABLE
Spark 3.0 can create tables in any Iceberg catalog with the clause USING iceberg
:
CREATE TABLE prod.db.sample ( id bigint COMMENT 'unique id', data string) USING iceberg
Iceberg will convert the column type in Spark to corresponding Iceberg type. Please check the section of type compatibility on creating table for details.
Table create commands, including CTAS and RTAS, support the full range of Spark create clauses, including:
PARTITION BY (partition-expressions)
to configure partitioningLOCATION '(fully-qualified-uri)'
to set the table locationCOMMENT 'table documentation'
to set a table descriptionTBLPROPERTIES ('key'='value', ...)
to set table configurationCreate commands may also set the default format with the USING
clause. This is only supported for SparkCatalog
because Spark handles the USING
clause differently for the built-in catalog.
PARTITIONED BY
To create a partitioned table, use PARTITIONED BY
:
CREATE TABLE prod.db.sample ( id bigint, data string, category string) USING iceberg PARTITIONED BY (category)
The PARTITIONED BY
clause supports transform expressions to create hidden partitions.
CREATE TABLE prod.db.sample ( id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (bucket(16, id), days(ts), category)
Supported partition transforms are:
years
for yearly partitionsmonths
for monthly partitionsdays
for daily partitionshours
for hourly partitionsbucket
for bucketing (with width)truncate
to truncate integers or strings (with length)CREATE TABLE ... AS SELECT
Iceberg supports CTAS as an atomic operation when using a SparkCatalog
. CTAS is supported, but is not atomic when using SparkSessionCatalog
.
CREATE TABLE prod.db.sample USING iceberg AS SELECT ...
REPLACE TABLE ... AS SELECT
Iceberg supports RTAS as an atomic operation when using a SparkCatalog
. RTAS is supported, but is not atomic when using SparkSessionCatalog
.
Atomic table replacement creates a new snapshot with the results of the SELECT
query, but keeps table history.
REPLACE TABLE prod.db.sample USING iceberg AS SELECT ...
CREATE OR REPLACE TABLE prod.db.sample USING iceberg AS SELECT ...
The schema and partition spec will be replaced if changed. To avoid modifying the table's schema and partitioning, use INSERT OVERWRITE
instead of REPLACE TABLE
. The new table properties in the REPLACE TABLE
command will be merged with any existing table properties. The existing table properties will be updated if changed else they are preserved.
ALTER TABLE
Iceberg has full ALTER TABLE
support in Spark 3, including:
int
, float
, and decimal
fieldsALTER TABLE ... RENAME TO
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name
ALTER TABLE ... SET TBLPROPERTIES
ALTER TABLE prod.db.sample SET TBLPROPERTIES ( 'read.split.target-size'='268435456' )
Iceberg uses table properties to control table behavior. For a list of available properties, see Table configuration.
UNSET
is used to remove properties:
ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size')
ALTER TABLE ... ADD COLUMN
ALTER TABLE prod.db.sample ADD COLUMN point struct<x: double NOT NULL, y: double NOT NULL> AFTER data ALTER TABLE prod.db.sample ADD COLUMN point.z double FIRST
ALTER TABLE ... RENAME COLUMN
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude
Note that nested rename commands only rename the leaf field. The above command renames location.lat
to location.latitude
ALTER TABLE ... ALTER COLUMN
Alter column is used to widen types, make a field optional, set comments, and reorder fields.
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL ALTER TABLE prod.db.sample ALTER COLUMN location.lat TYPE double ALTER TABLE prod.db.sample ALTER COLUMN point.z AFTER y ALTER TABLE prod.db.sample ALTER COLUMN id COMMENT 'unique id'
ALTER TABLE ... DROP COLUMN
ALTER TABLE prod.db.sample DROP COLUMN id ALTER TABLE prod.db.sample DROP COLUMN point.z
ALTER TABLE ... ADD PARTITION FIELD
ALTER TABLE prod.db.sample ADD PARTITION FIELD catalog -- identity transform ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(data, 4) ALTER TABLE prod.db.sample ADD PARTITION FIELD years(ts) -- use optional AS keyword to specify a custom name for the partition field ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard
!!! Warning Changing partitioning will change the behavior of dynamic writes, which overwrite any partition that is written to. For example, if you partition by days and move to partitioning by hours, overwrites will overwrite hourly partitions but not days anymore.
ALTER TABLE ... DROP PARTITION FIELD
ALTER TABLE prod.db.sample DROP PARTITION FIELD catalog ALTER TABLE prod.db.sample DROP PARTITION FIELD bucket(16, id) ALTER TABLE prod.db.sample DROP PARTITION FIELD truncate(data, 4) ALTER TABLE prod.db.sample DROP PARTITION FIELD years(ts) ALTER TABLE prod.db.sample DROP PARTITION FIELD shard
!!! Warning Changing partitioning will change the behavior of dynamic writes, which overwrite any partition that is written to. For example, if you partition by days and move to partitioning by hours, overwrites will overwrite hourly partitions but not days anymore.
ALTER TABLE ... WRITE ORDERED BY
ALTER TABLE prod.db.sample WRITE ORDERED BY category, id -- use optional ASC/DEC keyword to specify sort order of each field (default ASC) ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC -- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST) ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
DROP TABLE
To delete a table, run:
DROP TABLE prod.db.sample
In Spark 3, tables use identifiers that include a catalog name.
SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table
Metadata tables, like history
and snapshots
, can use the Iceberg table name as a namespace.
For example, to read from the files
metadata table for prod.db.table
, run:
SELECT * FROM prod.db.table.files
To load a table as a DataFrame, use table
:
val df = spark.table("prod.db.table")
!!! Warning When reading with DataFrames in Spark 3, use table
to load a table by name from a catalog unless option
is also required. Using format("iceberg")
loads an isolated table reference that is not refreshed when other queries update the table.
To select a specific table snapshot or the snapshot at some time, Iceberg supports two Spark read options:
snapshot-id
selects a specific table snapshotas-of-timestamp
selects the current snapshot at a timestamp, in milliseconds// time travel to October 26, 1986 at 01:21:00 spark.read .option("as-of-timestamp", "499162860000") .format("iceberg") .load("path/to/table")
// time travel to snapshot with ID 10963874102873L spark.read .option("snapshot-id", 10963874102873L) .format("iceberg") .load("path/to/table")
!!! Note Spark does not currently support using option
with table
in DataFrameReader commands. All options will be silently ignored. Do not use table
when attempting to time-travel or use other options. Options will be supported with table
in Spark 3.1 - SPARK-32592.
Time travel is not yet supported by Spark's SQL syntax.
Paths and table names can be loaded from the Spark3 dataframe interface. How paths/tables are loaded depends on how the identifier is specified. When using spark.read().format("iceberg").path(table)
or spark.table(table)
the table
variable can take a number of forms as listed below:
file:/path/to/table
-> loads a HadoopTable at given pathtablename
-> loads currentCatalog.currentNamespace.tablename
catalog.tablename
-> load tablename
from the specified catalog.namespace.tablename
-> load namespace.tablename
from current catalogcatalog.namespace.tablename
-> load namespace.tablename
from the specified catalog.namespace1.namespace2.tablename
-> load namespace1.namespace2.tablename
from current catalogThe above list is in order of priority. For example: a matching catalog will take priority over any namespace resolution.
Spark 2.4 requires using the DataFrame reader with iceberg
as a format, because 2.4 does not support catalogs:
// named metastore table spark.read.format("iceberg").load("db.table") // Hadoop path table spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")
To run SQL SELECT
statements on Iceberg tables in 2.4, register the DataFrame as a temporary table:
val df = spark.read.format("iceberg").load("db.table") df.createOrReplaceTempView("table") spark.sql("""select count(1) from table""").show()
Spark 3 supports SQL INSERT INTO
and INSERT OVERWRITE
, as well as the new DataFrameWriterV2
API.
INSERT INTO
To append new data to a table, use INSERT INTO
.
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...
INSERT OVERWRITE
To replace data in the table with the result of a query, use INSERT OVERWRITE
. Overwrites are atomic operations for Iceberg tables.
The partitions that will be replaced by INSERT OVERWRITE
depends on Spark's partition overwrite mode and the partitioning of a table.
!!! Warning Spark 3.0.0 has a correctness bug that affects dynamic INSERT OVERWRITE
with hidden partitioning, SPARK-32168. For tables with hidden partitions, wait for Spark 3.0.1.
Spark's default overwrite mode is static, but dynamic overwrite mode is recommended when writing to Iceberg tables. Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION
clause to a filter, but the PARTITION
clause can only reference table columns.
Dynamic overwrite mode is configured by setting spark.sql.sources.partitionOverwriteMode=dynamic
.
To demonstrate the behavior of dynamic and static overwrites, consider a logs
table defined by the following DDL:
CREATE TABLE prod.my_app.logs ( uuid string NOT NULL, level string NOT NULL, ts timestamp NOT NULL, message string) USING iceberg PARTITIONED BY (level, hours(ts))
When Spark's overwrite mode is dynamic, partitions that have rows produced by the SELECT
query will be replaced.
For example, this query removes duplicate log events from the example logs
table.
INSERT OVERWRITE prod.my_app.logs SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE cast(ts as date) = '2020-07-01' GROUP BY uuid
In dynamic mode, this will replace any partition with rows in the SELECT
result. Because the date of all rows is restricted to 1 July, only hours of that day will be replaced.
When Spark's overwrite mode is static, the PARTITION
clause is converted to a filter that is used to delete from the table. If the PARTITION
clause is omitted, all partitions will be replaced.
Because there is no PARTITION
clause in the query above, it will drop all existing rows in the table when run in static mode, but will only write the logs from 1 July.
To overwrite just the partitions that were loaded, add a PARTITION
clause that aligns with the SELECT
query filter:
INSERT OVERWRITE prod.my_app.logs PARTITION (level = 'INFO') SELECT uuid, first(level), first(ts), first(message) FROM prod.my_app.logs WHERE level = 'INFO' GROUP BY uuid
Note that this mode cannot replace hourly partitions like the dynamic example query because the PARTITION
clause can only reference table columns, not hidden partitions.
DELETE FROM
Spark 3 added support for DELETE FROM
queries to remove data from tables.
Delete queries accept a filter to match rows to delete. Iceberg can delete data as long as the filter matches entire partitions of the table, or it can determine that all rows of a file match. If a file contains some rows that should be deleted and some that should not, Iceberg will throw an exception.
DELETE FROM prod.db.table WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
Spark 3 introduced the new DataFrameWriterV2
API for writing to tables using data frames. The v2 API is recommended for several reasons:
partitionedBy
df.writeTo(t).create()
is equivalent to CREATE TABLE AS SELECT
df.writeTo(t).replace()
is equivalent to REPLACE TABLE AS SELECT
df.writeTo(t).append()
is equivalent to INSERT INTO
df.writeTo(t).overwritePartitions()
is equivalent to dynamic INSERT OVERWRITE
The v1 DataFrame write
API is still supported, but is not recommended.
!!! Warning When writing with the v1 DataFrame API in Spark 3, use saveAsTable
or insertInto
to load tables with a catalog. Using format("iceberg")
loads an isolated table reference that will not automatically refresh tables used by queries.
To append a dataframe to an Iceberg table, use append
:
val data: DataFrame = ... data.writeTo("prod.db.table").append()
In Spark 2.4, use the v1 API with append
mode and iceberg
format:
data.write .format("iceberg") .mode("append") .save("db.table")
To overwrite partitions dynamically, use overwritePartitions()
:
val data: DataFrame = ... data.writeTo("prod.db.table").overwritePartitions()
To explicitly overwrite partitions, use overwrite
to supply a filter:
data.writeTo("prod.db.table").overwrite($"level" === "INFO")
In Spark 2.4, overwrite values in an Iceberg table with overwrite
mode and iceberg
format:
data.write .format("iceberg") .mode("overwrite") .save("db.table")
!!! Warning The behavior of overwrite mode changed between Spark 2.4 and Spark 3.
The behavior of DataFrameWriter overwrite mode was undefined in Spark 2.4, but is required to overwrite the entire table in Spark 3. Because of this new requirement, the Iceberg source's behavior changed in Spark 3. In Spark 2.4, the behavior was to dynamically overwrite partitions. To use the Spark 2.4 behavior, add option overwrite-mode=dynamic
.
To run a CTAS or RTAS, use create
, replace
, or createOrReplace
operations:
val data: DataFrame = ... data.writeTo("prod.db.table").create()
Create and replace operations support table configuration methods, like partitionedBy
and tableProperty
:
data.writeTo("prod.db.table") .tableProperty("write.format.default", "orc") .partitionBy($"level", days($"ts")) .createOrReplace()
Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write against partitioned table. This applies both Writing with SQL and Writing with DataFrames.
!!! Note Explicit sort is necessary because Spark doesn't allow Iceberg to request a sort before writing as of Spark 3.0. SPARK-23889 is filed to enable Iceberg to require specific distribution & sort order to Spark.
!!! Note Both global sort (orderBy
/sort
) and local sort (sortWithinPartitions
) work for the requirement.
Let's go through writing the data against below sample table:
CREATE TABLE prod.db.sample ( id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (days(ts), category)
To write data to the sample table, your data needs to be sorted by days(ts), category
.
If you're inserting data with SQL statement, you can use ORDER BY
to achieve it, like below:
INSERT INTO prod.db.sample SELECT id, data, category, ts FROM another_table ORDER BY ts, category
If you're inserting data with DataFrame, you can use either orderBy
/sort
to trigger global sort, or sortWithinPartitions
to trigger local sort. Local sort for example:
data.sortWithinPartitions("ts", "category") .writeTo("prod.db.sample") .append()
You can simply add the original column to the sort condition for the most partition transformations, except bucket
.
For bucket
partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
Let's go through another sample table having bucket partition:
CREATE TABLE prod.db.sample ( id bigint, data string, category string, ts timestamp) USING iceberg PARTITIONED BY (bucket(16, id))
You need to register the function to deal with bucket, like below:
import org.apache.iceberg.spark.IcebergSpark import org.apache.spark.sql.types.DataTypes IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
!!! Note Explicit registration of the function is necessary because Spark doesn't allow Iceberg to provide functions. SPARK-27658 is filed to enable Iceberg to provide functions which can be used in query.
Here we just registered the bucket function as iceberg_bucket16
, which can be used in sort clause.
If you're inserting data with SQL statement, you can use the function like below:
INSERT INTO prod.db.sample SELECT id, data, category, ts FROM another_table ORDER BY iceberg_bucket16(id)
If you're inserting data with DataFrame, you can use the function like below:
data.sortWithinPartitions(expr("iceberg_bucket16(id)")) .writeTo("prod.db.sample") .append()
To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.
Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table
is read using db.table.history
.
!!! Note As of Spark 3.0, the format of the table name for inspection (catalog.database.table.metadata
) doesn‘t work with Spark’s default catalog (spark_catalog
). If you've replaced the default catalog, you may want to use DataFrameReader API to inspect the table.
To show table history, run:
SELECT * FROM prod.db.table.history
+-------------------------+---------------------+---------------------+---------------------+ | made_current_at | snapshot_id | parent_id | is_current_ancestor | +-------------------------+---------------------+---------------------+---------------------+ | 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true | | 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true | | 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false | | 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true | | 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true | | 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true | +-------------------------+---------------------+---------------------+---------------------+
!!! Note This shows a commit that was rolled back. The example has two snapshots with the same parent, and one is not an ancestor of the current table state.
To show the valid snapshots for a table, run:
SELECT * FROM prod.db.table.snapshots
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-------------------------------------------------------+ | committed_at | snapshot_id | parent_id | operation | manifest_list | summary | +-------------------------+----------------+-----------+-----------+----------------------------------------------------+-------------------------------------------------------+ | 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | { added-records -> 2478404, total-records -> 2478404, | | | | | | | added-data-files -> 438, total-data-files -> 438, | | | | | | | spark.app.id -> application_1520379288616_155055 } | | ... | ... | ... | ... | ... | ... | +-------------------------+----------------+-----------+-----------+----------------------------------------------------+-------------------------------------------------------+
You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from prod.db.table.history h join prod.db.table.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at
+-------------------------+-----------+----------------+---------------------+----------------------------------+ | made_current_at | operation | snapshot_id | is_current_ancestor | summary[spark.app.id] | +-------------------------+-----------+----------------+---------------------+----------------------------------+ | 2019-02-08 03:29:51.215 | append | 57897183625154 | true | application_1520379288616_155055 | | 2019-02-09 16:24:30.13 | delete | 29641004024753 | false | application_1520379288616_151109 | | 2019-02-09 16:32:47.336 | append | 57897183625154 | true | application_1520379288616_155055 | | 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true | application_1520379288616_152431 | +-------------------------+-----------+----------------+---------------------+----------------------------------+
To show a table‘s file manifests and each file’s metadata, run:
SELECT * FROM prod.db.table.manifests
+----------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+---------------------------------+ | path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partitions | +----------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+---------------------------------+ | s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro | 4479 | 0 | 6668963634911763636 | 8 | 0 | 0 | [[false,2019-05-13,2019-05-15]] | +----------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+---------------------------------+
To show a table‘s data files and each file’s metadata, run:
SELECT * FROM prod.db.table.files
+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+-----------------+-----------------+--------------+---------------+ | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | +-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+-----------------+-----------------+--------------+---------------+ | s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | | s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | | s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | +-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+-----------------+-----------------+--------------+---------------+
Metadata tables can be loaded in Spark 2.4 or Spark 3 using the DataFrameReader API:
// named metastore table spark.read.format("iceberg").load("db.table.files").show(truncate = false) // Hadoop path table spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table#files").show(truncate = false)
Spark and Iceberg support different set of types. Iceberg does the type conversion automatically, but not for all combinations, so you may want to understand the type conversion in Iceberg in prior to design the types of columns in your tables.
This type conversion table describes how Spark types are converted to the Iceberg types. The conversion applies on both creating Iceberg table and writing to Iceberg table via Spark.
Spark | Iceberg | Notes |
---|---|---|
boolean | boolean | |
short | integer | |
byte | integer | |
integer | integer | |
long | long | |
float | float | |
double | double | |
date | date | |
timestamp | timestamp with timezone | |
char | string | |
varchar | string | |
string | string | |
binary | binary | |
decimal | decimal | |
struct | struct | |
array | list | |
map | map |
!!! Note The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here're some points on write:
* Iceberg numeric types (`integer`, `long`, `float`, `double`, `decimal`) support promotion during writes. e.g. You can write Spark types `short`, `byte`, `integer`, `long` to Iceberg type `long`. * You can write to Iceberg `fixed` type using Spark `binary` type. Note that assertion on the length will be performed.
This type conversion table describes how Iceberg types are converted to the Spark types. The conversion applies on reading from Iceberg table via Spark.
Iceberg | Spark | Note |
---|---|---|
boolean | boolean | |
integer | integer | |
long | long | |
float | float | |
double | double | |
date | date | |
time | Not supported | |
timestamp with timezone | timestamp | |
timestamp without timezone | Not supported | |
string | string | |
uuid | string | |
fixed | binary | |
binary | binary | |
decimal | decimal | |
struct | struct | |
list | array | |
map | map |