| <!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Writes</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min.css rel=stylesheet type=text/css><link href="//fonts.googleapis.com/css?family=Lato:300,400,700,300italic,400italic,700italic" rel=stylesheet type=text/css><link href=../css/termynal.css rel=stylesheet></head><body><head><script>function addAnchor(e){e.insertAdjacentHTML("beforeend",`<a href="#${e.id}" class="anchortag" ariaLabel="Anchor"> 🔗 </a>`)}document.addEventListener("DOMContentLoaded",function(){var e=document.querySelectorAll("h1[id], h2[id], h3[id], h4[id]");e&&e.forEach(addAnchor)})</script></head><nav class="navbar navbar-default" role=navigation><topsection><div class=navbar-fixed-top><div><button type=button class=navbar-toggle data-toggle=collapse data-target=div.sidebar> |
| <span class=sr-only>Toggle navigation</span> |
| <span class=icon-bar></span> |
| <span class=icon-bar></span> |
| <span class=icon-bar></span></button> |
| <a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/fd-update-javadocs//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.4.1</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../latest>latest</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.4.1>1.4.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.4.0>1.4.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.3.1>1.3.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.3.0>1.3.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.2.1>1.2.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.2.0>1.2.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.1.0>1.1.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.0.0>1.0.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.14.1>0.14.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.14.0>0.14.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.13.2>0.13.2</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.13.1>0.13.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.13.0>0.13.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.12.1>0.12.1</a></li></ul></div></div></div><div class="navbar-menu-fixed-top navbar-pages-group"><div class=versions-dropdown><div class=topnav-page-selection><a href>Quickstart</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../hive-quickstart>Hive</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../spark-quickstart>Spark</a></li class="topnav-page-selection"></ul></div></div><div class=topnav-page-selection><a id=active href=https://iceberg.apache.org/docs/fd-update-javadocs/../../docs/latest>Docs</a></div><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../releases>Releases</a></div class="topnav-page-selection"><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../roadmap>Roadmap</a></div class="topnav-page-selection"><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../blogs>Blogs</a></div class="topnav-page-selection"><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../talks>Talks</a></div class="topnav-page-selection"><div class=versions-dropdown><div class=topnav-page-selection><a href>Project</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../community>Community</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../spec>Spec</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../view-spec>View Spec</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../puffin-spec>Puffin Spec</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../multi-engine-support>Multi-Engine Support</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../how-to-release>How To Release</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../terms>Terms</a></li class="topnav-page-selection"></ul></div></div><div class=versions-dropdown><div class=topnav-page-selection><a href>Concepts</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../catalog>Catalogs</a></li class="topnav-page-selection"></ul></div></div><div class=versions-dropdown><div class=topnav-page-selection><a href>ASF</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Donate</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/events/current-event.html>Events</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/licenses/>License</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/security/>Security</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/foundation/thanks.html>Sponsors</a></li class="topnav-page-selection"></ul></div></div><div class=topnav-page-selection><a href=https://github.com/apache/iceberg target=_blank><img src=https://iceberg.apache.org/docs/fd-update-javadocs//img/GitHub-Mark.png target=_blank class=top-navbar-logo></a></div><div class=topnav-page-selection><a href=https://join.slack.com/t/apache-iceberg/shared_invite/zt-2561tq9qr-UtISlHgsdY3Virs3Z2_btQ target=_blank><img src=https://iceberg.apache.org/docs/fd-update-javadocs//img/Slack_Mark_Web.png target=_blank class=top-navbar-logo></a></div></div></topsection></nav><section><div id=search-results-container><ul id=search-results></ul></div></section><body dir=" ltr"><section><div class="grid-container leftnav-and-toc"><div class="sidebar markdown-body"><div id=full><ul><li><a href=../><span>Introduction</span></a></li><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Tables><span>Tables</span> |
| <i class="fa fa-chevron-right"></i> |
| <i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../branching/>Branching and Tagging</a></li><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Spark><span>Spark</span> |
| <i class="fa fa-chevron-right"></i> |
| <i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-ddl/>DDL</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><span>Flink</span> |
| <i class="fa fa-chevron-right"></i> |
| <i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a id=active href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://clickhouse.com/docs/en/engines/table-engines/integrations/iceberg><span>ClickHouse</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_blank href=https://docs.dremio.com/data-formats/apache-iceberg/><span>Dremio</span></a></li><li><a target=_blank href=https://docs.starrocks.io/en-us/latest/data_source/catalog/iceberg_catalog><span>StarRocks</span></a></li><li><a target=_blank href=https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html><span>Amazon Athena</span></a></li><li><a target=_blank href=https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-cluster.html><span>Amazon EMR</span></a></li><li><a target=_blank href=https://impala.apache.org/docs/build/html/topics/impala_iceberg.html><span>Impala</span></a></li><li><a target=_blank href=https://doris.apache.org/docs/dev/lakehouse/multi-catalog/iceberg><span>Doris</span></a></li><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Integrations><span>Integrations</span> |
| <i class="fa fa-chevron-right"></i> |
| <i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span> |
| <i class="fa fa-chevron-right"></i> |
| <i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Migration><span>Migration</span> |
| <i class="fa fa-chevron-right"></i> |
| <i class="fa fa-chevron-down"></i></a></li><div id=Migration class=collapse><ul class=sub-menu><li><a href=../table-migration/>Overview</a></li><li><a href=../hive-migration/>Hive Migration</a></li><li><a href=../delta-lake-migration/>Delta Lake Migration</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body><div class=margin-for-toc><h1 id=flink-writes>Flink Writes</h1><p>Iceberg support batch and streaming writes With <a href=https://flink.apache.org/>Apache Flink</a>’s DataStream API and Table API.</p><h2 id=writing-with-sql>Writing with SQL</h2><p>Iceberg support both <code>INSERT INTO</code> and <code>INSERT OVERWRITE</code>.</p><h3 id=insert-into><code>INSERT INTO</code></h3><p>To append new data to a table with a Flink streaming job, use <code>INSERT INTO</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>VALUES</span> (<span style=color:#ae81ff>1</span>, <span style=color:#e6db74>'a'</span>); |
| </span></span><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>SELECT</span> id, <span style=color:#66d9ef>data</span> <span style=color:#66d9ef>from</span> other_kafka_table; |
| </span></span></code></pre></div><h3 id=insert-overwrite><code>INSERT OVERWRITE</code></h3><p>To replace data in the table with the result of a query, use <code>INSERT OVERWRITE</code> in batch job (flink streaming job does not support <code>INSERT OVERWRITE</code>). Overwrites are atomic operations for Iceberg tables.</p><p>Partitions that have rows produced by the SELECT query will be replaced, for example:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> OVERWRITE sample <span style=color:#66d9ef>VALUES</span> (<span style=color:#ae81ff>1</span>, <span style=color:#e6db74>'a'</span>); |
| </span></span></code></pre></div><p>Iceberg also support overwriting given partitions by the <code>select</code> values:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> OVERWRITE <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> PARTITION(<span style=color:#66d9ef>data</span><span style=color:#f92672>=</span><span style=color:#e6db74>'a'</span>) <span style=color:#66d9ef>SELECT</span> <span style=color:#ae81ff>6</span>; |
| </span></span></code></pre></div><p>For a partitioned iceberg table, when all the partition columns are set a value in <code>PARTITION</code> clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in <code>PARTITION</code> clause, it is writing the query result into a dynamic partition. |
| For an unpartitioned iceberg table, its data will be completely overwritten by <code>INSERT OVERWRITE</code>.</p><h3 id=upsert><code>UPSERT</code></h3><p>Iceberg supports <code>UPSERT</code> based on the primary key when writing data into v2 table format. There are two ways to enable upsert.</p><ol><li>Enable the <code>UPSERT</code> mode as table-level property <code>write.upsert.enabled</code>. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.</li></ol><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> ( |
| </span></span><span style=display:flex><span> <span style=color:#f92672>`</span>id<span style=color:#f92672>`</span> INT <span style=color:#66d9ef>UNIQUE</span> <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>'unique id'</span>, |
| </span></span><span style=display:flex><span> <span style=color:#f92672>`</span><span style=color:#66d9ef>data</span><span style=color:#f92672>`</span> STRING <span style=color:#66d9ef>NOT</span> <span style=color:#66d9ef>NULL</span>, |
| </span></span><span style=display:flex><span> <span style=color:#66d9ef>PRIMARY</span> <span style=color:#66d9ef>KEY</span>(<span style=color:#f92672>`</span>id<span style=color:#f92672>`</span>) <span style=color:#66d9ef>NOT</span> ENFORCED |
| </span></span><span style=display:flex><span>) <span style=color:#66d9ef>with</span> (<span style=color:#e6db74>'format-version'</span><span style=color:#f92672>=</span><span style=color:#e6db74>'2'</span>, <span style=color:#e6db74>'write.upsert.enabled'</span><span style=color:#f92672>=</span><span style=color:#e6db74>'true'</span>); |
| </span></span></code></pre></div><ol start=2><li>Enabling <code>UPSERT</code> mode using <code>upsert-enabled</code> in the <a href=#Write-options>write options</a> provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.</li></ol><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> tableName <span style=color:#75715e>/*+ OPTIONS('upsert-enabled'='true') */</span> |
| </span></span><span style=display:flex><span>... |
| </span></span></code></pre></div><div class=info>OVERWRITE and UPSERT can’t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.</div><h2 id=writing-with-datastream>Writing with DataStream</h2><p>Iceberg support writing to iceberg table from different DataStream input.</p><h3 id=appending-data>Appending data.</h3><p>Flink supports writing <code>DataStream<RowData></code> and <code>DataStream<Row></code> to the sink iceberg table natively.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>DataStream<span style=color:#f92672><</span>RowData<span style=color:#f92672>></span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span> |
| </span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>"hdfs://nn:8020/warehouse/path"</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>"Test Iceberg DataStream"</span><span style=color:#f92672>);</span> |
| </span></span></code></pre></div><p>The iceberg API also allows users to write generic <code>DataStream<T></code> to iceberg table, more example could be found in this <a href=https://github.com/apache/iceberg/blob/master/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java>unit test</a>.</p><h3 id=overwrite-data>Overwrite data</h3><p>Set the <code>overwrite</code> flag in FlinkSink builder to overwrite the data in existing iceberg tables:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>DataStream<span style=color:#f92672><</span>RowData<span style=color:#f92672>></span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span> |
| </span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>"hdfs://nn:8020/warehouse/path"</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>overwrite</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>"Test Iceberg DataStream"</span><span style=color:#f92672>);</span> |
| </span></span></code></pre></div><h3 id=upsert-data>Upsert data</h3><p>Set the <code>upsert</code> flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>DataStream<span style=color:#f92672><</span>RowData<span style=color:#f92672>></span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span> |
| </span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>"hdfs://nn:8020/warehouse/path"</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>upsert</span><span style=color:#f92672>(</span><span style=color:#66d9ef>true</span><span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>"Test Iceberg DataStream"</span><span style=color:#f92672>);</span> |
| </span></span></code></pre></div><div class=info>OVERWRITE and UPSERT can’t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.</div><h3 id=write-with-avro-genericrecord>Write with Avro GenericRecord</h3><p>Flink Iceberg sink provides <code>AvroGenericRecordToRowDataMapper</code> that converts |
| Avro <code>GenericRecord</code> to Flink <code>RowData</code>. You can use the mapper to write |
| Avro GenericRecord DataStream to Iceberg.</p><p>Please make sure <code>flink-avro</code> jar is included in the classpath. |
| Also <code>iceberg-flink-runtime</code> shaded bundle jar can’t be used |
| because the runtime jar shades the avro package. |
| Please use non-shaded <code>iceberg-flink</code> jar instead.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>DataStream<span style=color:#f92672><</span>org<span style=color:#f92672>.</span><span style=color:#a6e22e>apache</span><span style=color:#f92672>.</span><span style=color:#a6e22e>avro</span><span style=color:#f92672>.</span><span style=color:#a6e22e>generic</span><span style=color:#f92672>.</span><span style=color:#a6e22e>GenericRecord</span><span style=color:#f92672>></span> dataStream <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>Schema icebergSchema <span style=color:#f92672>=</span> table<span style=color:#f92672>.</span><span style=color:#a6e22e>schema</span><span style=color:#f92672>();</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span><span style=color:#75715e>// The Avro schema converted from Iceberg schema can't be used |
| </span></span></span><span style=display:flex><span><span style=color:#75715e>// due to precision difference between how Iceberg schema (micro) |
| </span></span></span><span style=display:flex><span><span style=color:#75715e>// and Flink AvroToRowDataConverters (milli) deal with time type. |
| </span></span></span><span style=display:flex><span><span style=color:#75715e>// Instead, use the Avro schema defined directly. |
| </span></span></span><span style=display:flex><span><span style=color:#75715e>// See AvroGenericRecordToRowDataMapper Javadoc for more details. |
| </span></span></span><span style=display:flex><span><span style=color:#75715e></span>org<span style=color:#f92672>.</span><span style=color:#a6e22e>apache</span><span style=color:#f92672>.</span><span style=color:#a6e22e>avro</span><span style=color:#f92672>.</span><span style=color:#a6e22e>Schema</span> avroSchema <span style=color:#f92672>=</span> AvroSchemaUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>convert</span><span style=color:#f92672>(</span>icebergSchema<span style=color:#f92672>,</span> table<span style=color:#f92672>.</span><span style=color:#a6e22e>name</span><span style=color:#f92672>());</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>GenericRecordAvroTypeInfo avroTypeInfo <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> GenericRecordAvroTypeInfo<span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>);</span> |
| </span></span><span style=display:flex><span>RowType rowType <span style=color:#f92672>=</span> FlinkSchemaUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>convert</span><span style=color:#f92672>(</span>icebergSchema<span style=color:#f92672>);</span> |
| </span></span><span style=display:flex><span> |
| </span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>builderFor</span><span style=color:#f92672>(</span> |
| </span></span><span style=display:flex><span> dataStream<span style=color:#f92672>,</span> |
| </span></span><span style=display:flex><span> AvroGenericRecordToRowDataMapper<span style=color:#f92672>.</span><span style=color:#a6e22e>forAvroSchema</span><span style=color:#f92672>(</span>avroSchema<span style=color:#f92672>),</span> |
| </span></span><span style=display:flex><span> FlinkCompatibilityUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>toTypeInfo</span><span style=color:#f92672>(</span>rowType<span style=color:#f92672>))</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>table</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span> |
| </span></span></code></pre></div><h3 id=branch-writes>Branch Writes</h3><p>Writing to branches in Iceberg tables is also supported via the <code>toBranch</code> API in <code>FlinkSink</code> |
| For more information on branches please refer to <a href=../../tables/branching>branches</a>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>toBranch</span><span style=color:#f92672>(</span><span style=color:#e6db74>"audit-branch"</span><span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span> |
| </span></span></code></pre></div><h3 id=metrics>Metrics</h3><p>The following Flink metrics are provided by the Flink Iceberg sink.</p><p>Parallel writer metrics are added under the sub group of <code>IcebergStreamWriter</code>. |
| They should have the following key-value tags.</p><ul><li>table: full table name (like iceberg.my_db.my_table)</li><li>subtask_index: writer subtask index starting from 0</li></ul><table><thead><tr><th>Metric name</th><th>Metric type</th><th>Description</th></tr></thead><tbody><tr><td>lastFlushDurationMs</td><td>Gague</td><td>The duration (in milli) that writer subtasks take to flush and upload the files during checkpoint.</td></tr><tr><td>flushedDataFiles</td><td>Counter</td><td>Number of data files flushed and uploaded.</td></tr><tr><td>flushedDeleteFiles</td><td>Counter</td><td>Number of delete files flushed and uploaded.</td></tr><tr><td>flushedReferencedDataFiles</td><td>Counter</td><td>Number of data files referenced by the flushed delete files.</td></tr><tr><td>dataFilesSizeHistogram</td><td>Histogram</td><td>Histogram distribution of data file sizes (in bytes).</td></tr><tr><td>deleteFilesSizeHistogram</td><td>Histogram</td><td>Histogram distribution of delete file sizes (in bytes).</td></tr></tbody></table><p>Committer metrics are added under the sub group of <code>IcebergFilesCommitter</code>. |
| They should have the following key-value tags.</p><ul><li>table: full table name (like iceberg.my_db.my_table)</li></ul><table><thead><tr><th>Metric name</th><th>Metric type</th><th>Description</th></tr></thead><tbody><tr><td>lastCheckpointDurationMs</td><td>Gague</td><td>The duration (in milli) that the committer operator checkpoints its state.</td></tr><tr><td>lastCommitDurationMs</td><td>Gague</td><td>The duration (in milli) that the Iceberg table commit takes.</td></tr><tr><td>committedDataFilesCount</td><td>Counter</td><td>Number of data files committed.</td></tr><tr><td>committedDataFilesRecordCount</td><td>Counter</td><td>Number of records contained in the committed data files.</td></tr><tr><td>committedDataFilesByteCount</td><td>Counter</td><td>Number of bytes contained in the committed data files.</td></tr><tr><td>committedDeleteFilesCount</td><td>Counter</td><td>Number of delete files committed.</td></tr><tr><td>committedDeleteFilesRecordCount</td><td>Counter</td><td>Number of records contained in the committed delete files.</td></tr><tr><td>committedDeleteFilesByteCount</td><td>Counter</td><td>Number of bytes contained in the committed delete files.</td></tr><tr><td>elapsedSecondsSinceLastSuccessfulCommit</td><td>Gague</td><td>Elapsed time (in seconds) since last successful Iceberg commit.</td></tr></tbody></table><p><code>elapsedSecondsSinceLastSuccessfulCommit</code> is an ideal alerting metric |
| to detect failed or missing Iceberg commits.</p><ul><li>Iceberg commit happened after successful Flink checkpoint in the <code>notifyCheckpointComplete</code> callback. |
| It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.</li><li>It could also happen that <code>notifyCheckpointComplete</code> wasn’t triggered (for whatever bug). |
| As a result, there won’t be any Iceberg commits attempted.</li></ul><p>If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like <code>elapsedSecondsSinceLastSuccessfulCommit > 60 minutes</code> to detect failed or missing Iceberg commits in the past hour.</p><h2 id=options>Options</h2><h3 id=write-options>Write options</h3><p>Flink write options are passed when configuring the FlinkSink, like this:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>Builder</span> builder <span style=color:#f92672>=</span> FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRow</span><span style=color:#f92672>(</span>dataStream<span style=color:#f92672>,</span> SimpleDataUtil<span style=color:#f92672>.</span><span style=color:#a6e22e>FLINK_SCHEMA</span><span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>table</span><span style=color:#f92672>(</span>table<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>set</span><span style=color:#f92672>(</span><span style=color:#e6db74>"write-format"</span><span style=color:#f92672>,</span> <span style=color:#e6db74>"orc"</span><span style=color:#f92672>)</span> |
| </span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>set</span><span style=color:#f92672>(</span>FlinkWriteOptions<span style=color:#f92672>.</span><span style=color:#a6e22e>OVERWRITE_MODE</span><span style=color:#f92672>,</span> <span style=color:#e6db74>"true"</span><span style=color:#f92672>);</span> |
| </span></span></code></pre></div><p>For Flink SQL, write options can be passed in via SQL hints like this:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> tableName <span style=color:#75715e>/*+ OPTIONS('upsert-enabled'='true') */</span> |
| </span></span><span style=display:flex><span>... |
| </span></span></code></pre></div><p>Check out all the options here: <a href=../flink-configuration#write-options>write-options</a></p></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#writing-with-sql>Writing with SQL</a><ul><li><a href=#insert-into><code>INSERT INTO</code></a></li><li><a href=#insert-overwrite><code>INSERT OVERWRITE</code></a></li><li><a href=#upsert><code>UPSERT</code></a></li></ul></li><li><a href=#writing-with-datastream>Writing with DataStream</a><ul><li><a href=#appending-data>Appending data.</a></li><li><a href=#overwrite-data>Overwrite data</a></li><li><a href=#upsert-data>Upsert data</a></li><li><a href=#write-with-avro-genericrecord>Write with Avro GenericRecord</a></li><li><a href=#branch-writes>Branch Writes</a></li><li><a href=#metrics>Metrics</a></li></ul></li><li><a href=#options>Options</a><ul><li><a href=#write-options>Write options</a></li></ul></li></ul></nav></div></div></div></div></section></body><script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/jquery-1.11.0.js></script> |
| <script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/jquery.easing.min.js></script> |
| <script type=text/javascript src=https://iceberg.apache.org/docs/fd-update-javadocs//js/search.js></script> |
| <script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/bootstrap.min.js></script> |
| <script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/iceberg-theme.js></script></html> |