title: “Flink Getting Started” url: flink aliases: - “flink/flink” menu: main: parent: Flink identifier: flink_getting_started weight: 100

Flink

Apache Iceberg supports both Apache Flink's DataStream API and Table API. See the Multi-Engine Support#apache-flink page for the integration of Apache Flink.

Feature supportFlinkNotes
SQL create catalog✔️
SQL create database✔️
SQL create table✔️
SQL create table like✔️
SQL alter table✔️Only support altering table properties, column and partition changes are not supported
SQL drop_table✔️
SQL select✔️Support both streaming and batch mode
SQL insert into✔️ ️Support both streaming and batch mode
SQL insert overwrite✔️ ️
DataStream read✔️ ️
DataStream append✔️ ️
DataStream overwrite✔️ ️
Metadata tables✔️
Rewrite files action✔️ ️

Preparation when using Flink SQL Client

To create Iceberg table in Flink, it is recommended to use Flink SQL Client as it's easier for users to understand the concepts.

Download Flink from the Apache download page. Iceberg uses Scala 2.12 when compiling the Apache iceberg-flink-runtime jar, so it's recommended to use Flink 1.16 bundled with Scala 2.12.

FLINK_VERSION=1.16.1
SCALA_VERSION=2.12
APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz

Start a standalone Flink cluster within Hadoop environment:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=2.8.5
wget ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

Start the Flink SQL client. There is a separate flink-runtime module in the Iceberg project to generate a bundled jar, which could be loaded by Flink SQL client directly. To build the flink-runtime bundled jar manually, build the iceberg project, and it will generate the jar under <iceberg-root-dir>/flink-runtime/build/libs. Or download the flink-runtime jar from the [Apache repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/{{% icebergVersion %}}/).

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`   

./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.16-{{% icebergVersion %}}.jar shell

By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client. Fortunately, Flink has provided a bundled hive jar for the SQL client. An example on how to download the dependencies and get started:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

ICEBERG_VERSION={{% icebergVersion %}}
MAVEN_URL=https://repo1.maven.org/maven2
ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
ICEBERG_PACKAGE=iceberg-flink-runtime
wget ${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}-${ICEBERG_VERSION}.jar -P lib/

HIVE_VERSION=2.3.9
SCALA_VERSION=2.12
FLINK_VERSION=1.16.1
FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar

./bin/sql-client.sh embedded shell

Flink's Python API

{{< hint info >}} PyFlink 1.6.1 does not work on OSX with a M1 cpu {{< /hint >}}

Install the Apache Flink dependency using pip:

pip install apache-flink==1.16.1

Provide a file:// path to the iceberg-flink-runtime jar, which can be obtained by building the project and looking at <iceberg-root-dir>/flink-runtime/build/libs, or downloading it from the Apache official repository. Third-party jars can be added to pyflink via:

  • env.add_jars("file:///my/jar/path/connector.jar")
  • table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar")

This is also mentioned in the official docs. The example below uses env.add_jars(..):

import os

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-1.16-{{% icebergVersion %}}.jar")

env.add_jars("file://{}".format(iceberg_flink_runtime_jar))

Next, create a StreamTableEnvironment and execute Flink SQL statements. The below example shows how to create a custom catalog via the Python Table API:

from pyflink.table import StreamTableEnvironment
table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""
CREATE CATALOG my_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='com.my.custom.CatalogImpl',
    'my-additional-catalog-config'='my-value'
)
""")

Run a query:

(table_env
    .sql_query("SELECT PULocationID, DOLocationID, passenger_count FROM my_catalog.nyc.taxis LIMIT 5")
    .execute()
    .print()) 
+----+----------------------+----------------------+--------------------------------+
| op |         PULocationID |         DOLocationID |                passenger_count |
+----+----------------------+----------------------+--------------------------------+
| +I |                  249 |                   48 |                            1.0 |
| +I |                  132 |                  233 |                            1.0 |
| +I |                  164 |                  107 |                            1.0 |
| +I |                   90 |                  229 |                            1.0 |
| +I |                  137 |                  249 |                            1.0 |
+----+----------------------+----------------------+--------------------------------+
5 rows in set

For more details, please refer to the Python Table API.

Adding catalogs.

Flink support to create catalogs by using Flink SQL.

Catalog Configuration

A catalog is created and named by executing the following query (replace <catalog_name> with your catalog name and <config_key>=<config_value> with catalog implementation config):

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 

The following properties can be set globally and are not limited to a specific catalog implementation:

  • type: Must be iceberg. (required)
  • catalog-type: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
  • catalog-impl: The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset. (Optional)
  • property-version: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1. (Optional)
  • cache-enabled: Whether to enable catalog cache, default value is true. (Optional)
  • cache.expiration-interval-ms: How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1. (Optional)

Hive catalog

This creates an Iceberg catalog named hive_catalog that can be configured using 'catalog-type'='hive', which loads tables from Hive metastore:

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

The following properties can be set if using the Hive catalog:

  • uri: The Hive metastore's thrift URI. (Required)
  • clients: The Hive metastore client pool size, default value is 2. (Optional)
  • warehouse: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath.
  • hive-conf-dir: Path to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.
  • hadoop-conf-dir: Path to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values.

Creating a table

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

Writing

To append new data to a table with a Flink streaming job, use INSERT INTO:

INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;

To replace data in the table with the result of a query, use INSERT OVERWRITE in batch job (flink streaming job does not support INSERT OVERWRITE). Overwrites are atomic operations for Iceberg tables.

Partitions that have rows produced by the SELECT query will be replaced, for example:

INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a');

Iceberg also support overwriting given partitions by the select values:

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;

Flink supports writing DataStream<RowData> and DataStream<Row> to the sink iceberg table natively.

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .append();

env.execute("Test Iceberg DataStream");

Branch Writes

Writing to branches in Iceberg tables is also supported via the toBranch API in FlinkSink For more information on branches please refer to branches.

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .toBranch("audit-branch")
    .append();

Reading

Submit a Flink batch job using the following sentences:

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM `hive_catalog`.`default`.`sample`;

Iceberg supports processing incremental data in flink streaming jobs which starts from a historical snapshot-id:

-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:

SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots`

Iceberg support streaming or batch read in Java API:

DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

Type conversion

Iceberg's integration for Flink automatically converts between Flink and Iceberg types. When writing to a table with types that are not supported by Flink, like UUID, Iceberg will accept and convert values from the Flink type.

Flink to Iceberg

Flink types are converted to Iceberg types according to the following table:

FlinkIcebergNotes
booleanboolean
tinyintinteger
smallintinteger
integerinteger
bigintlong
floatfloat
doubledouble
charstring
varcharstring
stringstring
binarybinary
varbinaryfixed
decimaldecimal
datedate
timetime
timestamptimestamp without timezone
timestamp_ltztimestamp with timezone
arraylist
mapmap
multisetmap
rowstruct
rawNot supported
intervalNot supported
structuredNot supported
timestamp with zoneNot supported
distinctNot supported
nullNot supported
symbolNot supported
logicalNot supported

Iceberg to Flink

Iceberg types are converted to Flink types according to the following table:

IcebergFlink
booleanboolean
structrow
listarray
mapmap
integerinteger
longbigint
floatfloat
doubledouble
datedate
timetime
timestamp without timezonetimestamp(6)
timestamp with timezonetimestamp_ltz(6)
stringvarchar(2147483647)
uuidbinary(16)
fixed(N)binary(N)
binaryvarbinary(2147483647)
decimal(P, S)decimal(P, S)

Future improvement.

There are some features that are do not yet supported in the current Flink Iceberg integration work:

  • Don't support creating iceberg table with hidden partitioning. Discussion in flink mail list.
  • Don't support creating iceberg table with computed column.
  • Don't support creating iceberg table with watermark.
  • Don't support adding columns, removing columns, renaming columns, changing columns. FLINK-19062 is tracking this.