blob: 690fcd845498bf5d1964962587dca3163088a687 [file] [log] [blame] [view]
---
title: "Flink Writes"
url: flink-writes
aliases:
- "flink/flink-writes"
menu:
main:
parent: Flink
identifier: flink_writes
weight: 400
---
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->
# Flink Writes
Iceberg support batch and streaming writes With [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API.
## Writing with SQL
Iceberg support both `INSERT INTO` and `INSERT OVERWRITE`.
### `INSERT INTO`
To append new data to a table with a Flink streaming job, use `INSERT INTO`:
```sql
INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
```
### `INSERT OVERWRITE`
To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.
Partitions that have rows produced by the SELECT query will be replaced, for example:
```sql
INSERT OVERWRITE sample VALUES (1, 'a');
```
Iceberg also support overwriting given partitions by the `select` values:
```sql
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
```
For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
For an unpartitioned iceberg table, its data will be completely overwritten by `INSERT OVERWRITE`.
### `UPSERT`
Iceberg supports `UPSERT` based on the primary key when writing data into v2 table format. There are two ways to enable upsert.
1. Enable the `UPSERT` mode as table-level property `write.upsert.enabled`. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
```sql
CREATE TABLE `hive_catalog`.`default`.`sample` (
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');
```
2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#Write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
```sql
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```
{{< hint info >}}
OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
{{< /hint >}}
## Writing with DataStream
Iceberg support writing to iceberg table from different DataStream input.
### Appending data.
Flink supports writing `DataStream<RowData>` and `DataStream<Row>` to the sink iceberg table natively.
```java
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
env.execute("Test Iceberg DataStream");
```
The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
### Overwrite data
Set the `overwrite` flag in FlinkSink builder to overwrite the data in existing iceberg tables:
```java
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.append();
env.execute("Test Iceberg DataStream");
```
### Upsert data
Set the `upsert` flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.
```java
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.upsert(true)
.append();
env.execute("Test Iceberg DataStream");
```
{{< hint info >}}
OVERWRITE and UPSERT can't be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.
{{< /hint >}}
### Write with Avro GenericRecord
Flink Iceberg sink provides `AvroGenericRecordToRowDataMapper` that converts
Avro `GenericRecord` to Flink `RowData`. You can use the mapper to write
Avro GenericRecord DataStream to Iceberg.
Please make sure `flink-avro` jar is included in the classpath.
Also `iceberg-flink-runtime` shaded bundle jar can't be used
because the runtime jar shades the avro package.
Please use non-shaded `iceberg-flink` jar instead.
```java
DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;
Schema icebergSchema = table.schema();
// The Avro schema converted from Iceberg schema can't be used
// due to precision difference between how Iceberg schema (micro)
// and Flink AvroToRowDataConverters (milli) deal with time type.
// Instead, use the Avro schema defined directly.
// See AvroGenericRecordToRowDataMapper Javadoc for more details.
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());
GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
FlinkSink.builderFor(
dataStream,
AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
FlinkCompatibilityUtil.toTypeInfo(rowType))
.table(table)
.tableLoader(tableLoader)
.append();
```
### Branch Writes
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`
For more information on branches please refer to [branches](../tables/branching).
```java
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.toBranch("audit-branch")
.append();
```
### Metrics
The following Flink metrics are provided by the Flink Iceberg sink.
Parallel writer metrics are added under the sub group of `IcebergStreamWriter`.
They should have the following key-value tags.
* table: full table name (like iceberg.my_db.my_table)
* subtask_index: writer subtask index starting from 0
Metric name | Metric type | Description |
| ------------------------- |------------|-----------------------------------------------------------------------------------------------------|
| lastFlushDurationMs | Gague | The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint. |
| flushedDataFiles | Counter | Number of data files flushed and uploaded. |
| flushedDeleteFiles | Counter | Number of delete files flushed and uploaded. |
| flushedReferencedDataFiles| Counter | Number of data files referenced by the flushed delete files. |
| dataFilesSizeHistogram | Histogram | Histogram distribution of data file sizes (in bytes). |
| deleteFilesSizeHistogram | Histogram | Histogram distribution of delete file sizes (in bytes). |
Committer metrics are added under the sub group of `IcebergFilesCommitter`.
They should have the following key-value tags.
* table: full table name (like iceberg.my_db.my_table)
Metric name | Metric type | Description |
|---------------------------------|--------|----------------------------------------------------------------------------|
| lastCheckpointDurationMs | Gague | The duration (in milli) that the committer operator checkpoints its state. |
| lastCommitDurationMs | Gague | The duration (in milli) that the Iceberg table commit takes. |
| committedDataFilesCount | Counter | Number of data files committed. |
| committedDataFilesRecordCount | Counter | Number of records contained in the committed data files. |
| committedDataFilesByteCount | Counter | Number of bytes contained in the committed data files. |
| committedDeleteFilesCount | Counter | Number of delete files committed. |
| committedDeleteFilesRecordCount | Counter | Number of records contained in the committed delete files. |
| committedDeleteFilesByteCount | Counter | Number of bytes contained in the committed delete files. |
| elapsedSecondsSinceLastSuccessfulCommit| Gague | Elapsed time (in seconds) since last successful Iceberg commit. |
`elapsedSecondsSinceLastSuccessfulCommit` is an ideal alerting metric
to detect failed or missing Iceberg commits.
* Iceberg commit happened after successful Flink checkpoint in the `notifyCheckpointComplete` callback.
It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.
* It could also happen that `notifyCheckpointComplete` wasn't triggered (for whatever bug).
As a result, there won't be any Iceberg commits attempted.
If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like `elapsedSecondsSinceLastSuccessfulCommit > 60 minutes` to detect failed or missing Iceberg commits in the past hour.
## Options
### Write options
Flink write options are passed when configuring the FlinkSink, like this:
```java
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
```
For Flink SQL, write options can be passed in via SQL hints like this:
```sql
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```
Check out all the options here: [write-options](/flink-configuration#write-options)