title: “Flink Queries” url: flink-queries aliases:

  • “flink/flink-queries” menu: main: parent: Flink identifier: flink_queries weight: 300

Flink Queries

Iceberg support streaming and batch read With Apache Flink's DataStream API and Table API.

Reading with SQL

Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from streaming to batch, and vice versa:

-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;

Flink batch read

Submit a Flink batch job using the following sentences:

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM sample;

Flink streaming read

Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:

-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

There are some options that could be set in Flink SQL hint options for streaming job, see read options for details.

FLIP-27 source for SQL

Here are the SQL settings for the FLIP-27 source. All other SQL settings and options documented above are applicable to the FLIP-27 source.

-- Opt in the FLIP-27 source. Default is false.
SET table.exec.iceberg.use-flip27-source = true;

Reading branches and tags with SQL

Branch and tags can be read via SQL by specifying options. For more details refer to Flink Configuration

--- Read from branch b1
SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ;

--- Read from tag t1
SELECT * FROM table /*+ OPTIONS('tag'='t1') */;

--- Incremental scan from tag t1 to tag t2
SELECT * FROM table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='t1', 'end-tag'='t2') */;

Reading with DataStream

Iceberg support streaming or batch read in Java API now.

Batch Read

This example will read all records from iceberg table and then print to the stdout console in flink batch job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming read

This example will read incremental records which start from snapshot-id ‘3821550127947089987’ and print to stdout console in flink streaming job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(true)
     .startSnapshotId(3821550127947089987L)
     .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");

There are other options that can be set, please see the [FlinkSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/FlinkSource.html).

Reading with DataStream (FLIP-27 source)

FLIP-27 source interface was introduced in Flink 1.12. It aims to solve several shortcomings of the old SourceFunction streaming source interface. It also unifies the source interfaces for both batch and streaming executions. Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface. Flink is planning to deprecate the old SourceFunction interface in the near future.

A FLIP-27 based Flink IcebergSource is added in iceberg-flink module. The FLIP-27 IcebergSource is currently an experimental feature.

Batch Read

This example will read all records from iceberg table and then print to the stdout console in flink batch job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource<RowData> source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .build();

DataStream<RowData> batch = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming read

This example will start the streaming read from the latest table snapshot (inclusive). Every 60s, it polls Iceberg table to discover new append-only snapshots. CDC read is not supported yet.

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .monitorInterval(Duration.ofSeconds(60))
    .build()

DataStream<RowData> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");

There are other options that could be set by Java API, please see the [IcebergSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/IcebergSource.html).

Reading branches and tags with DataStream

Branches and tags can also be read via the DataStream API

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
// Read from branch
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .branch("test-branch")
    .streaming(false)
    .build();

// Read from tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .tag("test-tag")
    .streaming(false)
    .build();

// Streaming read from start-tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startTag("test-tag")
    .build();

Read as Avro GenericRecord

FLIP-27 Iceberg source provides AvroGenericRecordReaderFunction that converts Flink RowData Avro GenericRecord. You can use the convert to read from Iceberg table as Avro GenericRecord DataStream.

Please make sure flink-avro jar is included in the classpath. Also iceberg-flink-runtime shaded bundle jar can't be used because the runtime jar shades the avro package. Please use non-shaded iceberg-flink jar instead.

TableLoader tableLoader = ...;
Table table;
try (TableLoader loader = tableLoader) {
    loader.open();
    table = loader.loadTable();
}

AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);

IcebergSource<GenericRecord> source =
    IcebergSource.<GenericRecord>builder()
        .tableLoader(tableLoader)
        .readerFunction(readerFunction)
        .assignerFactory(new SimpleSplitAssignerFactory())
        ...
        .build();

DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
    "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));

Options

Read options

Flink read options are passed when configuring the Flink IcebergSource:

IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
    .build()

For Flink SQL, read options can be passed in via SQL hints like this:

SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
...

Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.

env.getConfig()
    .getConfiguration()
    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
...

Check out all the options here: read-options

Inspecting tables

To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.

Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table$history.

History

To show table history:

SELECT * FROM prod.db.table$history;
made_current_atsnapshot_idparent_idis_current_ancestor
2019-02-08 03:29:51.2155781947118336215154NULLtrue
2019-02-08 03:47:55.94851792995261850568305781947118336215154true
2019-02-09 16:24:30.132964100402475335445179299526185056830false
2019-02-09 16:32:47.33629998756080624373305179299526185056830true
2019-02-09 19:42:03.91989245587860605834792999875608062437330true
2019-02-09 19:49:16.34365367338231819750458924558786060583479true

{{< hint info >}} This shows a commit that was rolled back. In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state. {{< /hint >}}

Metadata Log Entries

To show table metadata log entries:

SELECT * from prod.db.table$metadata_log_entries;
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number
2022-07-28 10:43:52.93s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull
2022-07-28 10:43:57.487s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json17026083367764530001
2022-07-28 10:43:58.25s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402

Snapshots

To show the valid snapshots for a table:

SELECT * FROM prod.db.table$snapshots;
committed_atsnapshot_idparent_idoperationmanifest_listsummary
2019-02-08 03:29:51.21557897183625154nullappends3://.../table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 }

You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:

select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor,
    s.summary['flink.job-id']
from prod.db.table$history h
join prod.db.table$snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at
made_current_atoperationsnapshot_idis_current_ancestorsummary[flink.job-id]
2019-02-08 03:29:51.215append57897183625154true2e274eecb503d85369fb390e8956c813

Files

To show a table's current data files:

SELECT * FROM prod.db.table$files;
contentfile_pathfile_formatspec_idpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 01}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> c][1 -> , 2 -> c]null[4]nullnull
0s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 02}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> b][1 -> , 2 -> b]null[4]nullnull
0s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 03}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> a][1 -> , 2 -> a]null[4]nullnull

Manifests

To show a table's current file manifests:

SELECT * FROM prod.db.table$manifests;
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro447906668963634911763636800[[false,null,2019-05-13,2019-05-15]]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

Partitions

To show a table's current partitions:

SELECT * FROM prod.db.table$partitions;
partitionrecord_countfile_countspec_id
{20211001, 11}110
{20211002, 11}110
{20211001, 10}110
{20211002, 10}110

Note: For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.

All Metadata Tables

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.

{{< hint danger >}} The “all” metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot. {{< /hint >}}

All Data Files

To show all of the table‘s data files and each file’s metadata:

SELECT * FROM prod.db.table$all_data_files;
contentfile_pathfile_formatpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquetPARQUET{20210102}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210102}{1 -> 2, 2 -> 20210102}null[4]null0
0s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquetPARQUET{20210103}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210103}{1 -> 3, 2 -> 20210103}null[4]null0
0s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquetPARQUET{20210104}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210104}{1 -> 3, 2 -> 20210104}null[4]null0

All Manifests

To show all of the table's manifest files:

SELECT * FROM prod.db.table$all_manifests;
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro637606272782676904868561200[{false, false, 20210101, 20210101}]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

References

To show a table's known snapshot references:

SELECT * FROM prod.db.table$refs;
nametypesnapshot_idmax_reference_age_in_msmin_snapshots_to_keepmax_snapshot_age_in_ms
mainBRANCH4686954189838128572102030
testTagTAG468695418983812857210nullnull