title: “Flink Configuration” url: flink-configuration aliases: - “flink/flink-configuration” menu: main: parent: Flink identifier: flink_configuration weight: 600

Flink Configuration

Catalog Configuration

A catalog is created and named by executing the following query (replace <catalog_name> with your catalog name and <config_key>=<config_value> with catalog implementation config):

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 

The following properties can be set globally and are not limited to a specific catalog implementation:

PropertyRequiredValuesDescription
type✔️icebergMust be iceberg.
catalog-typehive, hadoop or resthive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl.
catalog-implThe fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset.
property-versionVersion number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1.
cache-enabledtrue or falseWhether to enable catalog cache, default value is true.
cache.expiration-interval-msHow long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1.

The following properties can be set if using the Hive catalog:

PropertyRequiredValuesDescription
uri✔️The Hive metastore's thrift URI.
clientsThe Hive metastore client pool size, default value is 2.
warehouseThe Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath.
hive-conf-dirPath to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.
hadoop-conf-dirPath to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values.

The following properties can be set if using the Hadoop catalog:

PropertyRequiredValuesDescription
warehouse✔️The HDFS directory to store metadata files and data files.

The following properties can be set if using the REST catalog:

PropertyRequiredValuesDescription
uri✔️The URL to the REST Catalog.
credentialA credential to exchange for a token in the OAuth2 client credentials flow.
tokenA token which will be used to interact with the server.

Runtime configuration

Read options

Flink read options are passed when configuring the Flink IcebergSource:

IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
    .build()

For Flink SQL, read options can be passed in via SQL hints like this:

SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
...

Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.

env.getConfig()
    .getConfiguration()
    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
...

Read option has the highest priority, followed by Flink configuration and then Table property.

Read optionFlink configurationTable propertyDefaultDescription
snapshot-idN/AN/AnullFor time travel in batch mode. Read data from the specified snapshot-id.
case-sensitiveconnector.iceberg.case-sensitiveN/AfalseIf true, match column name in a case sensitive way.
as-of-timestampN/AN/AnullFor time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds.
starting-strategyconnector.iceberg.starting-strategyN/AINCREMENTAL_FROM_LATEST_SNAPSHOTStarting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source.
start-snapshot-timestampN/AN/AnullStart to read data from the most recent snapshot as of the given time in milliseconds.
start-snapshot-idN/AN/AnullStart to read data from the specified snapshot-id.
end-snapshot-idN/AN/AThe latest snapshot idSpecifies the end snapshot.
branchN/AN/AmainSpecifies the branch to read from in batch mode
tagN/AN/AnullSpecifies the tag to read from in batch mode
start-tagN/AN/AnullSpecifies the starting tag to read from for incremental reads
end-tagN/AN/AnullSpecifies the ending tag to to read from for incremental reads
split-sizeconnector.iceberg.split-sizeread.split.target-size128 MBTarget size when combining input splits.
split-lookbackconnector.iceberg.split-file-open-costread.split.planning-lookback10Number of bins to consider when combining input splits.
split-file-open-costconnector.iceberg.split-file-open-costread.split.open-file-cost4MBThe estimated cost to open a file, used as a minimum weight when combining splits.
streamingconnector.iceberg.streamingN/AfalseSets whether the current task runs in streaming or batch mode.
monitor-intervalconnector.iceberg.monitor-intervalN/A60sMonitor interval to discover splits from new snapshots. Applicable only for streaming read.
include-column-statsconnector.iceberg.include-column-statsN/AfalseCreate a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds.
max-planning-snapshot-countconnector.iceberg.max-planning-snapshot-countN/AInteger.MAX_VALUEMax number of snapshots limited per split enumeration. Applicable only to streaming read.
limitconnector.iceberg.limitN/A-1Limited output number of rows.
max-allowed-planning-failuresconnector.iceberg.max-allowed-planning-failuresN/A3Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure.

Write options

Flink write options are passed when configuring the FlinkSink, like this:

FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .set("write-format", "orc")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");

For Flink SQL, write options can be passed in via SQL hints like this:

INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
Flink optionDefaultDescription
write-formatTable write.format.defaultFile format to use for this write operation; parquet, avro, or orc
target-file-size-bytesAs per table propertyOverrides this table's write.target-file-size-bytes
upsert-enabledTable write.upsert.enabledOverrides this table's write.upsert.enabled
overwrite-enabledfalseOverwrite the table‘s data, overwrite mode shouldn’t be enable when configuring to use UPSERT data stream.
distribution-modeTable write.distribution-modeOverrides this table's write.distribution-mode
compression-codecTable write.(fileformat).compression-codecOverrides this table's compression codec for this write
compression-levelTable write.(fileformat).compression-levelOverrides this table's compression level for Parquet and Avro tables for this write
compression-strategyTable write.orc.compression-strategyOverrides this table's compression strategy for ORC tables for this write
write-parallelismUpstream operator parallelismOverrides the writer parallelism