Apache Iceberg support both Apache Flink's DataStream API and Table API to write records into iceberg table. Currently, we only integrate iceberg with apache flink 1.11.x .
Feature support | Flink 1.11.0 | Notes |
---|---|---|
SQL create catalog | ✔️ | |
SQL create database | ✔️ | |
SQL create table | ✔️ | |
SQL create table like | ✔️ | |
SQL alter table | ✔️ | Only support altering table properties, Columns/PartitionKey changes are not supported now |
SQL drop_table | ✔️ | |
SQL select | ✔️ | Only support batch mode now. |
SQL insert into | ✔️ ️ | Support both streaming and batch mode |
SQL insert overwrite | ✔️ ️ | |
DataStream read | ✔️ ️ | |
DataStream append | ✔️ ️ | |
DataStream overwrite | ✔️ ️ | |
Metadata tables | ️ |
To create iceberg table in flink, we recommend to use Flink SQL Client because it's easier for users to understand the concepts.
Step.1 Downloading the flink 1.11.x binary package from the apache flink download page. We now use scala 2.12 to archive the apache iceberg-flink-runtime jar, so it's recommended to use flink 1.11 bundled with scala 2.12.
wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz tar xzvf flink-1.11.1-bin-scala_2.12.tgz
Step.2 Start a standalone flink cluster within hadoop environment.
# HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` # Start the flink standalone cluster ./bin/start-cluster.sh
Step.3 Start the flink SQL client.
We've created a separate flink-runtime
module in iceberg project to generate a bundled jar, which could be loaded by flink SQL client directly.
If we want to build the flink-runtime
bundled jar manually, please just build the iceberg
project and it will generate the jar under <iceberg-root-dir>/flink-runtime/build/libs
. Of course, we could also download the flink-runtime
jar from the apache official repository.
# HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` ./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell
By default, iceberg has included hadoop jars for hadoop catalog. If we want to use hive catalog, we will need to load the hive jars when opening the flink sql client. Fortunately, apache flink has provided a bundled hive jar for sql client. So we could open the sql client as the following:
# HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` # wget the flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar from the above bundled jar URL firstly. # open the SQL client. ./bin/sql-client.sh embedded \ -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar \ -j <hive-bundlded-jar-directory>/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \ shell
Flink 1.11 support to create catalogs by using flink sql.
This creates an iceberg catalog named hive_catalog
that loads tables from a hive metastore:
CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://localhost:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://nn:8020/warehouse/path' );
type
: Please just use iceberg
for iceberg table format. (Required)catalog-type
: Iceberg currently support hive
or hadoop
catalog type. (Required)uri
: The Hive metastore's thrift URI. (Required)clients
: The Hive metastore client pool size, default value is 2. (Optional)property-version
: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The currently property version is 1
. (Optional)warehouse
: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir
to specify a location containing a hive-site.xml
configuration file nor add a correct hive-site.xml
to classpath.hive-conf-dir
: Path to a directory containing a hive-site.xml
configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir
from <hive-conf-dir>/hive-site.xml
(or hive configure file from classpath) will be overwrote with the warehouse
value if setting both hive-conf-dir
and warehouse
when creating iceberg catalog.Iceberg also supports a directory-based catalog in HDFS that can be configured using 'catalog-type'='hadoop'
:
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://nn:8020/warehouse/path', 'property-version'='1' );
warehouse
: The HDFS directory to store metadata files and data files. (Required)We could execute the sql command USE CATALOG hive_catalog
to set the current catalog.
Flink also supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property. When catalog-impl
is set, the value of catalog-type
is ignored. Here is an example:
CREATE CATALOG my_catalog WITH ( 'type'='iceberg', 'catalog-impl'='com.my.custom.CatalogImpl', 'my-additional-catalog-config'='my-value' );
CREATE DATABASE
By default, iceberg will use the default
database in flink. Using the following example to create a separate database if we don't want to create tables under the default
database:
CREATE DATABASE iceberg_db; USE iceberg_db;
CREATE TABLE
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING );
Table create commands support the most commonly used flink create clauses now, including:
PARTITION BY (column1, column2, ...)
to configure partitioning, apache flink does not yet support hidden partitioning.COMMENT 'table document'
to set a table description.WITH ('key'='value', ...)
to set table configuration which will be stored in apache iceberg table properties.Currently, it does not support computed column, primary key and watermark definition etc.
PARTITIONED BY
To create a partition table, use PARTITIONED BY
:
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING ) PARTITIONED BY (data);
Apache Iceberg support hidden partition but apache flink don‘t support partitioning by a function on columns, so we’ve no way to support hidden partition in flink DDL now, we will improve apache flink DDL in future.
CREATE TABLE LIKE
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE
.
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING ); CREATE TABLE hive_catalog.default.sample_like LIKE hive_catalog.default.sample;
For more details, refer to the Flink CREATE TABLE
documentation.
ALTER TABLE
Iceberg only support altering table properties in flink 1.11 now.
ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
ALTER TABLE .. RENAME TO
ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;
DROP TABLE
To delete a table, run:
DROP TABLE hive_catalog.default.sample;
Iceberg does not support streaming read in flink now, it's still working in-progress. But it support batch read to scan the existing records in iceberg table.
-- Execute the flink job in batch mode for current session context SET execution.type = batch ; SELECT * FROM sample ;
Notice: we could execute the following sql command to switch the execute type from ‘streaming’ mode to ‘batch’ mode, and vice versa:
-- Execute the flink job in streaming mode for current session context SET execution.type = streaming -- Execute the flink job in batch mode for current session context SET execution.type = batch
Iceberg support both INSERT INTO
and INSERT OVERWRITE
in flink 1.11 now.
INSERT INTO
To append new data to a table with a flink streaming job, use INSERT INTO
:
INSERT INTO hive_catalog.default.sample VALUES (1, 'a'); INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;
INSERT OVERWRITE
To replace data in the table with the result of a query, use INSERT OVERWRITE
in batch job (flink streaming job does not support INSERT OVERWRITE
). Overwrites are atomic operations for Iceberg tables.
Partitions that have rows produced by the SELECT query will be replaced, for example:
INSERT OVERWRITE sample VALUES (1, 'a');
Iceberg also support overwriting given partitions by the select
values:
INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
For a partitioned iceberg table, when all the partition columns are set a value in PARTITION
clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in PARTITION
clause, it is writing the query result into a dynamic partition. For an unpartitioned iceberg table, its data will be completely overwritten by INSERT OVERWRITE
.
Iceberg does not support streaming or batch read now, but it's working in-progress.
Iceberg support writing to iceberg table from different DataStream input.
we have supported writing DataStream<RowData>
and DataStream<Row>
to the sink iceberg table natively.
StreamExecutionEnvironment env = ...; DataStream<RowData> input = ... ; Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path"); FlinkSink.forRowData(input) .tableLoader(tableLoader) .hadoopConf(hadoopConf) .build(); env.execute("Test Iceberg DataStream");
The iceberg API also allows users to write generic DataStream<T>
to iceberg table, more example could be found in this unit test.
To overwrite the data in existing iceberg table dynamically, we could set the overwrite
flag in FlinkSink builder.
StreamExecutionEnvironment env = ...; DataStream<RowData> input = ... ; Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path"); FlinkSink.forRowData(input) .tableLoader(tableLoader) .overwrite(true) .hadoopConf(hadoopConf) .build(); env.execute("Test Iceberg DataStream");
Iceberg does not support inspecting table in flink sql now, we need to use iceberg's Java API to read iceberg's meta data to get those table information.
There are some features that we do not yet support in the current flink iceberg integration work: