CREATE Catalog
This creates an Iceberg catalog named hive_catalog
that can be configured using 'catalog-type'='hive'
, which loads tables from Hive metastore:
CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://localhost:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://nn:8020/warehouse/path' );
The following properties can be set if using the Hive catalog:
uri
: The Hive metastore's thrift URI. (Required)clients
: The Hive metastore client pool size, default value is 2. (Optional)warehouse
: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir
to specify a location containing a hive-site.xml
configuration file nor add a correct hive-site.xml
to classpath.hive-conf-dir
: Path to a directory containing a hive-site.xml
configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir
from <hive-conf-dir>/hive-site.xml
(or hive configure file from classpath) will be overwritten with the warehouse
value if setting both hive-conf-dir
and warehouse
when creating iceberg catalog.hadoop-conf-dir
: Path to a directory containing core-site.xml
and hdfs-site.xml
configuration files which will be used to provide custom Hadoop configuration values.Iceberg also supports a directory-based catalog in HDFS that can be configured using 'catalog-type'='hadoop'
:
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://nn:8020/warehouse/path', 'property-version'='1' );
The following properties can be set if using the Hadoop catalog:
warehouse
: The HDFS directory to store metadata files and data files. (Required)Execute the sql command USE CATALOG hadoop_catalog
to set the current catalog.
This creates an iceberg catalog named rest_catalog
that can be configured using 'catalog-type'='rest'
, which loads tables from a REST catalog:
CREATE CATALOG rest_catalog WITH ( 'type'='iceberg', 'catalog-type'='rest', 'uri'='https://localhost/' );
The following properties can be set if using the REST catalog:
uri
: The URL to the REST Catalog (Required)credential
: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)token
: A token which will be used to interact with the server (Optional)Flink also supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property:
CREATE CATALOG my_catalog WITH ( 'type'='iceberg', 'catalog-impl'='com.my.custom.CatalogImpl', 'my-additional-catalog-config'='my-value' );
Catalogs can be registered in sql-client-defaults.yaml
before starting the SQL client.
catalogs: - name: my_catalog type: iceberg catalog-type: hadoop warehouse: hdfs://nn:8020/warehouse/path
The Flink SQL Client supports the -i
startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.
-- define available catalogs CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://localhost:9083', 'warehouse'='hdfs://nn:8020/warehouse/path' ); USE CATALOG hive_catalog;
Using -i <init.sql>
option to initialize SQL Client session:
/path/to/bin/sql-client.sh -i /path/to/init.sql
CREATE DATABASE
By default, Iceberg will use the default
database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default
database:
CREATE DATABASE iceberg_db; USE iceberg_db;
CREATE TABLE
CREATE TABLE `hive_catalog`.`default`.`sample` ( id BIGINT COMMENT 'unique id', data STRING );
Table create commands support the commonly used Flink create clauses including:
PARTITION BY (column1, column2, ...)
to configure partitioning, Flink does not yet support hidden partitioning.COMMENT 'table document'
to set a table description.WITH ('key'='value', ...)
to set table configuration which will be stored in Iceberg table properties.Currently, it does not support computed column, primary key and watermark definition etc.
PARTITIONED BY
To create a partition table, use PARTITIONED BY
:
CREATE TABLE `hive_catalog`.`default`.`sample` ( id BIGINT COMMENT 'unique id', data STRING ) PARTITIONED BY (data);
Iceberg support hidden partition but Flink don't support partitioning by a function on columns, so there is no way to support hidden partition in Flink DDL.
CREATE TABLE LIKE
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE
.
CREATE TABLE `hive_catalog`.`default`.`sample` ( id BIGINT COMMENT 'unique id', data STRING ); CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
For more details, refer to the Flink CREATE TABLE
documentation.
ALTER TABLE
Iceberg only support altering table properties:
ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro')
ALTER TABLE .. RENAME TO
ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
DROP TABLE
To delete a table, run:
DROP TABLE `hive_catalog`.`default`.`sample`;