title: “Queries” url: spark-queries aliases: - “spark/spark-queries” menu: main: parent: Spark weight: 0

Spark Queries

To use Iceberg in Spark, first configure Spark catalogs.

Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions:

Feature supportSpark 3.0Spark 2.4Notes
SELECT✔️
DataFrame reads✔️✔️
Metadata table SELECT✔️
History metadata table✔️✔️
Snapshots metadata table✔️✔️
Files metadata table✔️✔️
Manifests metadata table✔️✔️
Partitions metadata table✔️✔️
All metadata tables✔️✔️

Querying with SQL

In Spark 3, tables use identifiers that include a catalog name.

SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table

Metadata tables, like history and snapshots, can use the Iceberg table name as a namespace.

For example, to read from the files metadata table for prod.db.table:

SELECT * FROM prod.db.table.files
contentfile_pathfile_formatspec_idpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 01}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> c][1 -> , 2 -> c]null[4]nullnull
0s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 02}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> b][1 -> , 2 -> b]null[4]nullnull
0s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 03}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> a][1 -> , 2 -> a]null[4]nullnull

Querying with DataFrames

To load a table as a DataFrame, use table:

val df = spark.table("prod.db.table")

Catalogs with DataFrameReader

Iceberg 0.11.0 adds multi-catalog support to DataFrameReader in both Spark 3.x and 2.4.

Paths and table names can be loaded with Spark's DataFrameReader interface. How tables are loaded depends on how the identifier is specified. When using spark.read.format("iceberg").path(table) or spark.table(table) the table variable can take a number of forms as listed below:

  • file:/path/to/table: loads a HadoopTable at given path
  • tablename: loads currentCatalog.currentNamespace.tablename
  • catalog.tablename: loads tablename from the specified catalog.
  • namespace.tablename: loads namespace.tablename from current catalog
  • catalog.namespace.tablename: loads namespace.tablename from the specified catalog.
  • namespace1.namespace2.tablename: loads namespace1.namespace2.tablename from current catalog

The above list is in order of priority. For example: a matching catalog will take priority over any namespace resolution.

Time travel

SQL

Spark 3.3 and later supports time travel in SQL queries using TIMESTAMP AS OF or VERSION AS OF clauses

-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';

-- time travel to snapshot with id 10963874102873L
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

In addition, FOR SYSTEM_TIME AS OF and FOR SYSTEM_VERSION AS OF clauses are also supported:

SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00';
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 10963874102873;

Timestamps may also be supplied as a Unix timestamp, in seconds:

-- timestamp in seconds
SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;

DataFrame

To select a specific table snapshot or the snapshot at some time in the DataFrame API, Iceberg supports two Spark read options:

  • snapshot-id selects a specific table snapshot
  • as-of-timestamp selects the current snapshot at a timestamp, in milliseconds
// time travel to October 26, 1986 at 01:21:00
spark.read
    .option("as-of-timestamp", "499162860000")
    .format("iceberg")
    .load("path/to/table")
// time travel to snapshot with ID 10963874102873L
spark.read
    .option("snapshot-id", 10963874102873L)
    .format("iceberg")
    .load("path/to/table")

{{< hint info >}} Spark 3.0 and earlier versions do not support using option with table in DataFrameReader commands. All options will be silently ignored. Do not use table when attempting to time-travel or use other options. See SPARK-32592. {{< /hint >}}

Incremental read

To read appended data incrementally, use:

  • start-snapshot-id Start snapshot ID used in incremental scans (exclusive).
  • end-snapshot-id End snapshot ID used in incremental scans (inclusive). This is optional. Omitting it will default to the current snapshot.
// get the data added after start-snapshot-id (10963874102873L) until end-snapshot-id (63874143573109L)
spark.read()
  .format("iceberg")
  .option("start-snapshot-id", "10963874102873")
  .option("end-snapshot-id", "63874143573109")
  .load("path/to/table")

{{< hint info >}} Currently gets only the data from append operation. Cannot support replace, overwrite, delete operations. Incremental read works with both V1 and V2 format-version. Incremental read is not supported by Spark's SQL syntax. {{< /hint >}}

Spark 2.4

Spark 2.4 requires using the DataFrame reader with iceberg as a format, because 2.4 does not support direct SQL queries:

// named metastore table
spark.read.format("iceberg").load("catalog.db.table")
// Hadoop path table
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")

Spark 2.4 with SQL

To run SQL SELECT statements on Iceberg tables in 2.4, register the DataFrame as a temporary table:

val df = spark.read.format("iceberg").load("db.table")
df.createOrReplaceTempView("table")

spark.sql("""select count(1) from table""").show()

Inspecting tables

To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.

Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table.history.

{{< hint info >}} For Spark 2.4, use the DataFrameReader API to inspect tables.

For Spark 3, prior to 3.2, the Spark session catalog does not support table names with multipart identifiers such as catalog.database.table.metadata. As a workaround, configure an org.apache.iceberg.spark.SparkCatalog, or use the Spark DataFrameReader API. {{< /hint >}}

History

To show table history:

SELECT * FROM prod.db.table.history
made_current_atsnapshot_idparent_idis_current_ancestor
2019-02-08 03:29:51.2155781947118336215154NULLtrue
2019-02-08 03:47:55.94851792995261850568305781947118336215154true
2019-02-09 16:24:30.132964100402475335445179299526185056830false
2019-02-09 16:32:47.33629998756080624373305179299526185056830true
2019-02-09 19:42:03.91989245587860605834792999875608062437330true
2019-02-09 19:49:16.34365367338231819750458924558786060583479true

{{< hint info >}} This shows a commit that was rolled back. The example has two snapshots with the same parent, and one is not an ancestor of the current table state. {{< /hint >}}

Metadata Log Entries

To show table metadata log entries:

SELECT * from prod.db.table.metadata_log_entries
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number
2022-07-28 10:43:52.93s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull
2022-07-28 10:43:57.487s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json17026083367764530001
2022-07-28 10:43:58.25s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402

Snapshots

To show the valid snapshots for a table:

SELECT * FROM prod.db.table.snapshots
committed_atsnapshot_idparent_idoperationmanifest_listsummary
2019-02-08 03:29:51.21557897183625154nullappends3://.../table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, spark.app.id -> application_1520379288616_155055 }

You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:

select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor,
    s.summary['spark.app.id']
from prod.db.table.history h
join prod.db.table.snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at
made_current_atoperationsnapshot_idis_current_ancestorsummary[spark.app.id]
2019-02-08 03:29:51.215append57897183625154trueapplication_1520379288616_155055
2019-02-09 16:24:30.13delete29641004024753falseapplication_1520379288616_151109
2019-02-09 16:32:47.336append57897183625154trueapplication_1520379288616_155055
2019-02-08 03:47:55.948overwrite51792995261850trueapplication_1520379288616_152431

Files

To show a table's current data files:

SELECT * FROM prod.db.table.files
contentfile_pathfile_formatspec_idpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 01}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> c][1 -> , 2 -> c]null[4]nullnull
0s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 02}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> b][1 -> , 2 -> b]null[4]nullnull
0s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 03}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> a][1 -> , 2 -> a]null[4]nullnull

Manifests

To show a table's current file manifests:

SELECT * FROM prod.db.table.manifests
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro447906668963634911763636800[[false,null,2019-05-13,2019-05-15]]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

Partitions

To show a table's current partitions:

SELECT * FROM prod.db.table.partitions
partitionrecord_countfile_count
{20211001, 11}11
{20211002, 11}11
{20211001, 10}11
{20211002, 10}11

All Metadata Tables

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.

{{< hint danger >}} The “all” metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot. {{< /hint >}}

All Data Files

To show all of the table‘s data files and each file’s metadata:

SELECT * FROM prod.db.table.all_data_files
contentfile_pathfile_formatpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquetPARQUET{20210102}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210102}{1 -> 2, 2 -> 20210102}null[4]null0
0s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquetPARQUET{20210103}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210103}{1 -> 3, 2 -> 20210103}null[4]null0
0s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquetPARQUET{20210104}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210104}{1 -> 3, 2 -> 20210104}null[4]null0

All Manifests

To show all of the table's manifest files:

SELECT * FROM prod.db.table.all_manifests
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro637606272782676904868561200[{false, false, 20210101, 20210101}]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

Inspecting with DataFrames

Metadata tables can be loaded in Spark 2.4 or Spark 3 using the DataFrameReader API:

// named metastore table
spark.read.format("iceberg").load("db.table.files").show(truncate = false)
// Hadoop path table
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table#files").show(truncate = false)