blob: aba2a66ad4fe9a83844d23c37ef6e3783b2bdd99 [file] [log] [blame]
<!doctype html><html><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><meta name=description content><meta name=author content><title>Flink Getting Started</title><link href=../css/bootstrap.css rel=stylesheet><link href=../css/markdown.css rel=stylesheet><link href=../css/katex.min.css rel=stylesheet><link href=../css/iceberg-theme.css rel=stylesheet><link href=../font-awesome-4.7.0/css/font-awesome.min.css rel=stylesheet type=text/css><link href="//fonts.googleapis.com/css?family=Lato:300,400,700,300italic,400italic,700italic" rel=stylesheet type=text/css><link href=../css/termynal.css rel=stylesheet></head><body><head><script>function addAnchor(e){e.insertAdjacentHTML("beforeend",`<a href="#${e.id}" class="anchortag" ariaLabel="Anchor"> 🔗 </a>`)}document.addEventListener("DOMContentLoaded",function(){var e=document.querySelectorAll("h1[id], h2[id], h3[id], h4[id]");e&&e.forEach(addAnchor)})</script></head><nav class="navbar navbar-default" role=navigation><topsection><div class=navbar-fixed-top><div><button type=button class=navbar-toggle data-toggle=collapse data-target=div.sidebar>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button>
<a class="page-scroll navbar-brand" href=https://iceberg.apache.org/><img class=top-navbar-logo src=https://iceberg.apache.org/docs/fd-update-javadocs//img/iceberg-logo-icon.png> Apache Iceberg</a></div><div><input type=search class=form-control id=search-input placeholder=Search... maxlength=64 data-hotkeys=s/></div><div class=versions-dropdown><span>1.4.1</span> <i class="fa fa-chevron-down"></i><div class=versions-dropdown-content><ul><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../latest>latest</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.4.1>1.4.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.4.0>1.4.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.3.1>1.3.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.3.0>1.3.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.2.1>1.2.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.2.0>1.2.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.1.0>1.1.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../1.0.0>1.0.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.14.1>0.14.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.14.0>0.14.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.13.2>0.13.2</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.13.1>0.13.1</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.13.0>0.13.0</a></li><li class=versions-dropdown-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../0.12.1>0.12.1</a></li></ul></div></div></div><div class="navbar-menu-fixed-top navbar-pages-group"><div class=versions-dropdown><div class=topnav-page-selection><a href>Quickstart</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../hive-quickstart>Hive</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../spark-quickstart>Spark</a></li class="topnav-page-selection"></ul></div></div><div class=topnav-page-selection><a id=active href=https://iceberg.apache.org/docs/fd-update-javadocs/../../docs/latest>Docs</a></div><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../releases>Releases</a></div class="topnav-page-selection"><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../roadmap>Roadmap</a></div class="topnav-page-selection"><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../blogs>Blogs</a></div class="topnav-page-selection"><div class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../talks>Talks</a></div class="topnav-page-selection"><div class=versions-dropdown><div class=topnav-page-selection><a href>Project</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../community>Community</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../spec>Spec</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../view-spec>View Spec</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../puffin-spec>Puffin Spec</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../multi-engine-support>Multi-Engine Support</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../how-to-release>How To Release</a></li class="topnav-page-selection"><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../terms>Terms</a></li class="topnav-page-selection"></ul></div></div><div class=versions-dropdown><div class=topnav-page-selection><a href>Concepts</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../catalog>Catalogs</a></li class="topnav-page-selection"></ul></div></div><div class=versions-dropdown><div class=topnav-page-selection><a href>ASF</a> <i class="fa fa-chevron-down"></i></div class="topnav-page-selection"><div class=versions-dropdown-content><ul><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Donate</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/events/current-event.html>Events</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/licenses/>License</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/security/>Security</a></li class="topnav-page-selection"><li class=topnav-page-selection><a target=_blank href=https://www.apache.org/foundation/thanks.html>Sponsors</a></li class="topnav-page-selection"></ul></div></div><div class=topnav-page-selection><a href=https://github.com/apache/iceberg target=_blank><img src=https://iceberg.apache.org/docs/fd-update-javadocs//img/GitHub-Mark.png target=_blank class=top-navbar-logo></a></div><div class=topnav-page-selection><a href=https://join.slack.com/t/apache-iceberg/shared_invite/zt-2561tq9qr-UtISlHgsdY3Virs3Z2_btQ target=_blank><img src=https://iceberg.apache.org/docs/fd-update-javadocs//img/Slack_Mark_Web.png target=_blank class=top-navbar-logo></a></div></div></topsection></nav><section><div id=search-results-container><ul id=search-results></ul></div></section><body dir=" ltr"><section><div class="grid-container leftnav-and-toc"><div class="sidebar markdown-body"><div id=full><ul><li><a href=../><span>Introduction</span></a></li><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Tables><span>Tables</span>
<i class="fa fa-chevron-right"></i>
<i class="fa fa-chevron-down"></i></a></li><div id=Tables class=collapse><ul class=sub-menu><li><a href=../branching/>Branching and Tagging</a></li><li><a href=../configuration/>Configuration</a></li><li><a href=../evolution/>Evolution</a></li><li><a href=../maintenance/>Maintenance</a></li><li><a href=../partitioning/>Partitioning</a></li><li><a href=../performance/>Performance</a></li><li><a href=../reliability/>Reliability</a></li><li><a href=../schemas/>Schemas</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Spark><span>Spark</span>
<i class="fa fa-chevron-right"></i>
<i class="fa fa-chevron-down"></i></a></li><div id=Spark class=collapse><ul class=sub-menu><li><a href=../getting-started/>Getting Started</a></li><li><a href=../spark-ddl/>DDL</a></li><li><a href=../spark-procedures/>Procedures</a></li><li><a href=../spark-queries/>Queries</a></li><li><a href=../spark-structured-streaming/>Structured Streaming</a></li><li><a href=../spark-writes/>Writes</a></li></ul></div><li><a class=chevron-toggle data-toggle=collapse data-parent=full href=#Flink><span>Flink</span>
<i class="fa fa-chevron-right"></i>
<i class="fa fa-chevron-down"></i></a></li><div id=Flink class="collapse in"><ul class=sub-menu><li><a id=active href=../flink/>Flink Getting Started</a></li><li><a href=../flink-connector/>Flink Connector</a></li><li><a href=../flink-ddl/>Flink DDL</a></li><li><a href=../flink-queries/>Flink Queries</a></li><li><a href=../flink-writes/>Flink Writes</a></li><li><a href=../flink-actions/>Flink Actions</a></li><li><a href=../flink-configuration/>Flink Configuration</a></li></ul></div><li><a href=../hive/><span>Hive</span></a></li><li><a target=_blank href=https://trino.io/docs/current/connector/iceberg.html><span>Trino</span></a></li><li><a target=_blank href=https://clickhouse.com/docs/en/engines/table-engines/integrations/iceberg><span>ClickHouse</span></a></li><li><a target=_blank href=https://prestodb.io/docs/current/connector/iceberg.html><span>Presto</span></a></li><li><a target=_blank href=https://docs.dremio.com/data-formats/apache-iceberg/><span>Dremio</span></a></li><li><a target=_blank href=https://docs.starrocks.io/en-us/latest/data_source/catalog/iceberg_catalog><span>StarRocks</span></a></li><li><a target=_blank href=https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html><span>Amazon Athena</span></a></li><li><a target=_blank href=https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-use-cluster.html><span>Amazon EMR</span></a></li><li><a target=_blank href=https://impala.apache.org/docs/build/html/topics/impala_iceberg.html><span>Impala</span></a></li><li><a target=_blank href=https://doris.apache.org/docs/dev/lakehouse/multi-catalog/iceberg><span>Doris</span></a></li><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Integrations><span>Integrations</span>
<i class="fa fa-chevron-right"></i>
<i class="fa fa-chevron-down"></i></a></li><div id=Integrations class=collapse><ul class=sub-menu><li><a href=../aws/>AWS</a></li><li><a href=../dell/>Dell</a></li><li><a href=../jdbc/>JDBC</a></li><li><a href=../nessie/>Nessie</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#API><span>API</span>
<i class="fa fa-chevron-right"></i>
<i class="fa fa-chevron-down"></i></a></li><div id=API class=collapse><ul class=sub-menu><li><a href=../java-api-quickstart/>Java Quickstart</a></li><li><a href=../api/>Java API</a></li><li><a href=../custom-catalog/>Java Custom Catalog</a></li></ul></div><li><a class="chevron-toggle collapsed" data-toggle=collapse data-parent=full href=#Migration><span>Migration</span>
<i class="fa fa-chevron-right"></i>
<i class="fa fa-chevron-down"></i></a></li><div id=Migration class=collapse><ul class=sub-menu><li><a href=../table-migration/>Overview</a></li><li><a href=../hive-migration/>Hive Migration</a></li><li><a href=../delta-lake-migration/>Delta Lake Migration</a></li></ul></div><li><a href=https://iceberg.apache.org/docs/fd-update-javadocs/../../javadoc/latest><span>Javadoc</span></a></li><li><a target=_blank href=https://py.iceberg.apache.org/><span>PyIceberg</span></a></li></div></div><div id=content class=markdown-body><div class=margin-for-toc><h1 id=flink>Flink</h1><p>Apache Iceberg supports both <a href=https://flink.apache.org/>Apache Flink</a>&rsquo;s DataStream API and Table API. See the <a href=https://iceberg.apache.org/multi-engine-support/#apache-flink>Multi-Engine Support#apache-flink</a> page for the integration of Apache Flink.</p><table><thead><tr><th>Feature support</th><th>Flink</th><th>Notes</th></tr></thead><tbody><tr><td><a href=#creating-catalogs-and-using-catalogs>SQL create catalog</a></td><td>✔️</td><td></td></tr><tr><td><a href=#create-database>SQL create database</a></td><td>✔️</td><td></td></tr><tr><td><a href=#create-table>SQL create table</a></td><td>✔️</td><td></td></tr><tr><td><a href=#create-table-like>SQL create table like</a></td><td>✔️</td><td></td></tr><tr><td><a href=#alter-table>SQL alter table</a></td><td>✔️</td><td>Only support altering table properties, column and partition changes are not supported</td></tr><tr><td><a href=#drop-table>SQL drop_table</a></td><td>✔️</td><td></td></tr><tr><td><a href=#querying-with-sql>SQL select</a></td><td>✔️</td><td>Support both streaming and batch mode</td></tr><tr><td><a href=#insert-into>SQL insert into</a></td><td>✔️ ️</td><td>Support both streaming and batch mode</td></tr><tr><td><a href=#insert-overwrite>SQL insert overwrite</a></td><td>✔️ ️</td><td></td></tr><tr><td><a href=#reading-with-datastream>DataStream read</a></td><td>✔️ ️</td><td></td></tr><tr><td><a href=#appending-data>DataStream append</a></td><td>✔️ ️</td><td></td></tr><tr><td><a href=#overwrite-data>DataStream overwrite</a></td><td>✔️ ️</td><td></td></tr><tr><td><a href=#inspecting-tables>Metadata tables</a></td><td>✔️</td><td></td></tr><tr><td><a href=#rewrite-files-action>Rewrite files action</a></td><td>✔️ ️</td><td></td></tr></tbody></table><h2 id=preparation-when-using-flink-sql-client>Preparation when using Flink SQL Client</h2><p>To create Iceberg table in Flink, it is recommended to use <a href=https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html>Flink SQL Client</a> as it&rsquo;s easier for users to understand the concepts.</p><p>Download Flink from the <a href=https://flink.apache.org/downloads.html>Apache download page</a>. Iceberg uses Scala 2.12 when compiling the Apache <code>iceberg-flink-runtime</code> jar, so it&rsquo;s recommended to use Flink 1.16 bundled with Scala 2.12.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-bash data-lang=bash><span style=display:flex><span>FLINK_VERSION<span style=color:#f92672>=</span>1.16.1
</span></span><span style=display:flex><span>SCALA_VERSION<span style=color:#f92672>=</span>2.12
</span></span><span style=display:flex><span>APACHE_FLINK_URL<span style=color:#f92672>=</span>https://archive.apache.org/dist/flink/
</span></span><span style=display:flex><span>wget <span style=color:#e6db74>${</span>APACHE_FLINK_URL<span style=color:#e6db74>}</span>/flink-<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>/flink-<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>-bin-scala_<span style=color:#e6db74>${</span>SCALA_VERSION<span style=color:#e6db74>}</span>.tgz
</span></span><span style=display:flex><span>tar xzvf flink-<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>-bin-scala_<span style=color:#e6db74>${</span>SCALA_VERSION<span style=color:#e6db74>}</span>.tgz
</span></span></code></pre></div><p>Start a standalone Flink cluster within Hadoop environment:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-bash data-lang=bash><span style=display:flex><span><span style=color:#75715e># HADOOP_HOME is your hadoop root directory after unpack the binary package.</span>
</span></span><span style=display:flex><span>APACHE_HADOOP_URL<span style=color:#f92672>=</span>https://archive.apache.org/dist/hadoop/
</span></span><span style=display:flex><span>HADOOP_VERSION<span style=color:#f92672>=</span>2.8.5
</span></span><span style=display:flex><span>wget <span style=color:#e6db74>${</span>APACHE_HADOOP_URL<span style=color:#e6db74>}</span>/common/hadoop-<span style=color:#e6db74>${</span>HADOOP_VERSION<span style=color:#e6db74>}</span>/hadoop-<span style=color:#e6db74>${</span>HADOOP_VERSION<span style=color:#e6db74>}</span>.tar.gz
</span></span><span style=display:flex><span>tar xzvf hadoop-<span style=color:#e6db74>${</span>HADOOP_VERSION<span style=color:#e6db74>}</span>.tar.gz
</span></span><span style=display:flex><span>HADOOP_HOME<span style=color:#f92672>=</span><span style=color:#e6db74>`</span>pwd<span style=color:#e6db74>`</span>/hadoop-<span style=color:#e6db74>${</span>HADOOP_VERSION<span style=color:#e6db74>}</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>export HADOOP_CLASSPATH<span style=color:#f92672>=</span><span style=color:#e6db74>`</span>$HADOOP_HOME/bin/hadoop classpath<span style=color:#e6db74>`</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span><span style=color:#75715e># Start the flink standalone cluster</span>
</span></span><span style=display:flex><span>./bin/start-cluster.sh
</span></span></code></pre></div><p>Start the Flink SQL client. There is a separate <code>flink-runtime</code> module in the Iceberg project to generate a bundled jar, which could be loaded by Flink SQL client directly. To build the <code>flink-runtime</code> bundled jar manually, build the <code>iceberg</code> project, and it will generate the jar under <code>&lt;iceberg-root-dir>/flink-runtime/build/libs</code>. Or download the <code>flink-runtime</code> jar from the <a href=https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.4.1/>Apache repository</a>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-bash data-lang=bash><span style=display:flex><span><span style=color:#75715e># HADOOP_HOME is your hadoop root directory after unpack the binary package.</span>
</span></span><span style=display:flex><span>export HADOOP_CLASSPATH<span style=color:#f92672>=</span><span style=color:#e6db74>`</span>$HADOOP_HOME/bin/hadoop classpath<span style=color:#e6db74>`</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>./bin/sql-client.sh embedded -j &lt;flink-runtime-directory&gt;/iceberg-flink-runtime-1.16-1.4.1.jar shell
</span></span></code></pre></div><p>By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client. Fortunately, Flink has provided a <a href=https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.1/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar>bundled hive jar</a> for the SQL client. An example on how to download the dependencies and get started:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-bash data-lang=bash><span style=display:flex><span><span style=color:#75715e># HADOOP_HOME is your hadoop root directory after unpack the binary package.</span>
</span></span><span style=display:flex><span>export HADOOP_CLASSPATH<span style=color:#f92672>=</span><span style=color:#e6db74>`</span>$HADOOP_HOME/bin/hadoop classpath<span style=color:#e6db74>`</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>ICEBERG_VERSION<span style=color:#f92672>=</span>1.4.1
</span></span><span style=display:flex><span>MAVEN_URL<span style=color:#f92672>=</span>https://repo1.maven.org/maven2
</span></span><span style=display:flex><span>ICEBERG_MAVEN_URL<span style=color:#f92672>=</span><span style=color:#e6db74>${</span>MAVEN_URL<span style=color:#e6db74>}</span>/org/apache/iceberg
</span></span><span style=display:flex><span>ICEBERG_PACKAGE<span style=color:#f92672>=</span>iceberg-flink-runtime
</span></span><span style=display:flex><span>wget <span style=color:#e6db74>${</span>ICEBERG_MAVEN_URL<span style=color:#e6db74>}</span>/<span style=color:#e6db74>${</span>ICEBERG_PACKAGE<span style=color:#e6db74>}</span>-<span style=color:#e6db74>${</span>FLINK_VERSION_MAJOR<span style=color:#e6db74>}</span>/<span style=color:#e6db74>${</span>ICEBERG_VERSION<span style=color:#e6db74>}</span>/<span style=color:#e6db74>${</span>ICEBERG_PACKAGE<span style=color:#e6db74>}</span>-<span style=color:#e6db74>${</span>FLINK_VERSION_MAJOR<span style=color:#e6db74>}</span>-<span style=color:#e6db74>${</span>ICEBERG_VERSION<span style=color:#e6db74>}</span>.jar -P lib/
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>HIVE_VERSION<span style=color:#f92672>=</span>2.3.9
</span></span><span style=display:flex><span>SCALA_VERSION<span style=color:#f92672>=</span>2.12
</span></span><span style=display:flex><span>FLINK_VERSION<span style=color:#f92672>=</span>1.16.1
</span></span><span style=display:flex><span>FLINK_CONNECTOR_URL<span style=color:#f92672>=</span><span style=color:#e6db74>${</span>MAVEN_URL<span style=color:#e6db74>}</span>/org/apache/flink
</span></span><span style=display:flex><span>FLINK_CONNECTOR_PACKAGE<span style=color:#f92672>=</span>flink-sql-connector-hive
</span></span><span style=display:flex><span>wget <span style=color:#e6db74>${</span>FLINK_CONNECTOR_URL<span style=color:#e6db74>}</span>/<span style=color:#e6db74>${</span>FLINK_CONNECTOR_PACKAGE<span style=color:#e6db74>}</span>-<span style=color:#e6db74>${</span>HIVE_VERSION<span style=color:#e6db74>}</span>_<span style=color:#e6db74>${</span>SCALA_VERSION<span style=color:#e6db74>}</span>/<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>/<span style=color:#e6db74>${</span>FLINK_CONNECTOR_PACKAGE<span style=color:#e6db74>}</span>-<span style=color:#e6db74>${</span>HIVE_VERSION<span style=color:#e6db74>}</span>_<span style=color:#e6db74>${</span>SCALA_VERSION<span style=color:#e6db74>}</span>-<span style=color:#e6db74>${</span>FLINK_VERSION<span style=color:#e6db74>}</span>.jar
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>./bin/sql-client.sh embedded shell
</span></span></code></pre></div><h2 id=flinks-python-api>Flink&rsquo;s Python API</h2><div class=info>PyFlink 1.6.1 <a href=https://issues.apache.org/jira/browse/FLINK-28786>does not work on OSX with a M1 cpu</a></div><p>Install the Apache Flink dependency using <code>pip</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-python data-lang=python><span style=display:flex><span>pip install apache<span style=color:#f92672>-</span>flink<span style=color:#f92672>==</span><span style=color:#ae81ff>1.16.1</span>
</span></span></code></pre></div><p>Provide a <code>file://</code> path to the <code>iceberg-flink-runtime</code> jar, which can be obtained by building the project and looking at <code>&lt;iceberg-root-dir>/flink-runtime/build/libs</code>, or downloading it from the <a href=https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/>Apache official repository</a>. Third-party jars can be added to <code>pyflink</code> via:</p><ul><li><code>env.add_jars("file:///my/jar/path/connector.jar")</code></li><li><code>table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar")</code></li></ul><p>This is also mentioned in the official <a href=https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/dependency_management/>docs</a>. The example below uses <code>env.add_jars(..)</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-python data-lang=python><span style=display:flex><span><span style=color:#f92672>import</span> os
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span><span style=color:#f92672>from</span> pyflink.datastream <span style=color:#f92672>import</span> StreamExecutionEnvironment
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>env <span style=color:#f92672>=</span> StreamExecutionEnvironment<span style=color:#f92672>.</span>get_execution_environment()
</span></span><span style=display:flex><span>iceberg_flink_runtime_jar <span style=color:#f92672>=</span> os<span style=color:#f92672>.</span>path<span style=color:#f92672>.</span>join(os<span style=color:#f92672>.</span>getcwd(), <span style=color:#e6db74>&#34;iceberg-flink-runtime-1.16-1.4.1.jar&#34;</span>)
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span>add_jars(<span style=color:#e6db74>&#34;file://</span><span style=color:#e6db74>{}</span><span style=color:#e6db74>&#34;</span><span style=color:#f92672>.</span>format(iceberg_flink_runtime_jar))
</span></span></code></pre></div><p>Next, create a <code>StreamTableEnvironment</code> and execute Flink SQL statements. The below example shows how to create a custom catalog via the Python Table API:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-python data-lang=python><span style=display:flex><span><span style=color:#f92672>from</span> pyflink.table <span style=color:#f92672>import</span> StreamTableEnvironment
</span></span><span style=display:flex><span>table_env <span style=color:#f92672>=</span> StreamTableEnvironment<span style=color:#f92672>.</span>create(env)
</span></span><span style=display:flex><span>table_env<span style=color:#f92672>.</span>execute_sql(<span style=color:#e6db74>&#34;&#34;&#34;
</span></span></span><span style=display:flex><span><span style=color:#e6db74>CREATE CATALOG my_catalog WITH (
</span></span></span><span style=display:flex><span><span style=color:#e6db74> &#39;type&#39;=&#39;iceberg&#39;,
</span></span></span><span style=display:flex><span><span style=color:#e6db74> &#39;catalog-impl&#39;=&#39;com.my.custom.CatalogImpl&#39;,
</span></span></span><span style=display:flex><span><span style=color:#e6db74> &#39;my-additional-catalog-config&#39;=&#39;my-value&#39;
</span></span></span><span style=display:flex><span><span style=color:#e6db74>)
</span></span></span><span style=display:flex><span><span style=color:#e6db74>&#34;&#34;&#34;</span>)
</span></span></code></pre></div><p>Run a query:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-python data-lang=python><span style=display:flex><span>(table_env
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span>sql_query(<span style=color:#e6db74>&#34;SELECT PULocationID, DOLocationID, passenger_count FROM my_catalog.nyc.taxis LIMIT 5&#34;</span>)
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span>execute()
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span>print())
</span></span></code></pre></div><pre tabindex=0><code>+----+----------------------+----------------------+--------------------------------+
| op | PULocationID | DOLocationID | passenger_count |
+----+----------------------+----------------------+--------------------------------+
| +I | 249 | 48 | 1.0 |
| +I | 132 | 233 | 1.0 |
| +I | 164 | 107 | 1.0 |
| +I | 90 | 229 | 1.0 |
| +I | 137 | 249 | 1.0 |
+----+----------------------+----------------------+--------------------------------+
5 rows in set
</code></pre><p>For more details, please refer to the <a href=https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/table/intro_to_table_api/>Python Table API</a>.</p><h2 id=adding-catalogs>Adding catalogs.</h2><p>Flink support to create catalogs by using Flink SQL.</p><h3 id=catalog-configuration>Catalog Configuration</h3><p>A catalog is created and named by executing the following query (replace <code>&lt;catalog_name></code> with your catalog name and
<code>&lt;config_key></code>=<code>&lt;config_value></code> with catalog implementation config):</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> <span style=color:#f92672>&lt;</span><span style=color:#66d9ef>catalog_name</span><span style=color:#f92672>&gt;</span> <span style=color:#66d9ef>WITH</span> (
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#f92672>`&lt;</span>config_key<span style=color:#f92672>&gt;`=`&lt;</span>config_value<span style=color:#f92672>&gt;`</span>
</span></span><span style=display:flex><span>);
</span></span></code></pre></div><p>The following properties can be set globally and are not limited to a specific catalog implementation:</p><ul><li><code>type</code>: Must be <code>iceberg</code>. (required)</li><li><code>catalog-type</code>: <code>hive</code>, <code>hadoop</code> or <code>rest</code> for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)</li><li><code>catalog-impl</code>: The fully-qualified class name of a custom catalog implementation. Must be set if <code>catalog-type</code> is unset. (Optional)</li><li><code>property-version</code>: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is <code>1</code>. (Optional)</li><li><code>cache-enabled</code>: Whether to enable catalog cache, default value is <code>true</code>. (Optional)</li><li><code>cache.expiration-interval-ms</code>: How long catalog entries are locally cached, in milliseconds; negative values like <code>-1</code> will disable expiration, value 0 is not allowed to set. default value is <code>-1</code>. (Optional)</li></ul><h3 id=hive-catalog>Hive catalog</h3><p>This creates an Iceberg catalog named <code>hive_catalog</code> that can be configured using <code>'catalog-type'='hive'</code>, which loads tables from Hive metastore:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>CATALOG</span> hive_catalog <span style=color:#66d9ef>WITH</span> (
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;iceberg&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;catalog-type&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hive&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;uri&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;thrift://localhost:9083&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;clients&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;5&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;property-version&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;1&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#e6db74>&#39;warehouse&#39;</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;hdfs://nn:8020/warehouse/path&#39;</span>
</span></span><span style=display:flex><span>);
</span></span></code></pre></div><p>The following properties can be set if using the Hive catalog:</p><ul><li><code>uri</code>: The Hive metastore&rsquo;s thrift URI. (Required)</li><li><code>clients</code>: The Hive metastore client pool size, default value is 2. (Optional)</li><li><code>warehouse</code>: The Hive warehouse location, users should specify this path if neither set the <code>hive-conf-dir</code> to specify a location containing a <code>hive-site.xml</code> configuration file nor add a correct <code>hive-site.xml</code> to classpath.</li><li><code>hive-conf-dir</code>: Path to a directory containing a <code>hive-site.xml</code> configuration file which will be used to provide custom Hive configuration values. The value of <code>hive.metastore.warehouse.dir</code> from <code>&lt;hive-conf-dir>/hive-site.xml</code> (or hive configure file from classpath) will be overwritten with the <code>warehouse</code> value if setting both <code>hive-conf-dir</code> and <code>warehouse</code> when creating iceberg catalog.</li><li><code>hadoop-conf-dir</code>: Path to a directory containing <code>core-site.xml</code> and <code>hdfs-site.xml</code> configuration files which will be used to provide custom Hadoop configuration values.</li></ul><h2 id=creating-a-table>Creating a table</h2><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>CREATE</span> <span style=color:#66d9ef>TABLE</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> (
</span></span><span style=display:flex><span> id BIGINT <span style=color:#66d9ef>COMMENT</span> <span style=color:#e6db74>&#39;unique id&#39;</span>,
</span></span><span style=display:flex><span> <span style=color:#66d9ef>data</span> STRING
</span></span><span style=display:flex><span>);
</span></span></code></pre></div><h2 id=writing>Writing</h2><p>To append new data to a table with a Flink streaming job, use <code>INSERT INTO</code>:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>VALUES</span> (<span style=color:#ae81ff>1</span>, <span style=color:#e6db74>&#39;a&#39;</span>);
</span></span><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> <span style=color:#66d9ef>INTO</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>SELECT</span> id, <span style=color:#66d9ef>data</span> <span style=color:#66d9ef>from</span> other_kafka_table;
</span></span></code></pre></div><p>To replace data in the table with the result of a query, use <code>INSERT OVERWRITE</code> in batch job (flink streaming job does not support <code>INSERT OVERWRITE</code>). Overwrites are atomic operations for Iceberg tables.</p><p>Partitions that have rows produced by the SELECT query will be replaced, for example:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> OVERWRITE <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#66d9ef>VALUES</span> (<span style=color:#ae81ff>1</span>, <span style=color:#e6db74>&#39;a&#39;</span>);
</span></span></code></pre></div><p>Iceberg also support overwriting given partitions by the <code>select</code> values:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>INSERT</span> OVERWRITE <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> PARTITION(<span style=color:#66d9ef>data</span><span style=color:#f92672>=</span><span style=color:#e6db74>&#39;a&#39;</span>) <span style=color:#66d9ef>SELECT</span> <span style=color:#ae81ff>6</span>;
</span></span></code></pre></div><p>Flink supports writing <code>DataStream&lt;RowData></code> and <code>DataStream&lt;Row></code> to the sink iceberg table natively.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>StreamExecutionEnvironment env <span style=color:#f92672>=</span> <span style=color:#f92672>...;</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>DataStream<span style=color:#f92672>&lt;</span>RowData<span style=color:#f92672>&gt;</span> input <span style=color:#f92672>=</span> <span style=color:#f92672>...</span> <span style=color:#f92672>;</span>
</span></span><span style=display:flex><span>Configuration hadoopConf <span style=color:#f92672>=</span> <span style=color:#66d9ef>new</span> Configuration<span style=color:#f92672>();</span>
</span></span><span style=display:flex><span>TableLoader tableLoader <span style=color:#f92672>=</span> TableLoader<span style=color:#f92672>.</span><span style=color:#a6e22e>fromHadoopTable</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;hdfs://nn:8020/warehouse/path&#34;</span><span style=color:#f92672>,</span> hadoopConf<span style=color:#f92672>);</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span>env<span style=color:#f92672>.</span><span style=color:#a6e22e>execute</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;Test Iceberg DataStream&#34;</span><span style=color:#f92672>);</span>
</span></span></code></pre></div><h3 id=branch-writes>Branch Writes</h3><p>Writing to branches in Iceberg tables is also supported via the <code>toBranch</code> API in <code>FlinkSink</code>
For more information on branches please refer to <a href=../../tables/branching>branches</a>.</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-java data-lang=java><span style=display:flex><span>FlinkSink<span style=color:#f92672>.</span><span style=color:#a6e22e>forRowData</span><span style=color:#f92672>(</span>input<span style=color:#f92672>)</span>
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>tableLoader</span><span style=color:#f92672>(</span>tableLoader<span style=color:#f92672>)</span>
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>toBranch</span><span style=color:#f92672>(</span><span style=color:#e6db74>&#34;audit-branch&#34;</span><span style=color:#f92672>)</span>
</span></span><span style=display:flex><span> <span style=color:#f92672>.</span><span style=color:#a6e22e>append</span><span style=color:#f92672>();</span>
</span></span></code></pre></div><h2 id=reading>Reading</h2><p>Submit a Flink <strong>batch</strong> job using the following sentences:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Execute the flink job in batch mode for current session context
</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> batch;
</span></span><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span>;
</span></span></code></pre></div><p>Iceberg supports processing incremental data in flink <strong>streaming</strong> jobs which starts from a historical snapshot-id:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#75715e>-- Submit the flink job in streaming mode for current session.
</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> execution.runtime<span style=color:#f92672>-</span><span style=color:#66d9ef>mode</span> <span style=color:#f92672>=</span> streaming;
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span><span style=color:#75715e>-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SET</span> <span style=color:#66d9ef>table</span>.<span style=color:#66d9ef>dynamic</span><span style=color:#f92672>-</span><span style=color:#66d9ef>table</span><span style=color:#f92672>-</span><span style=color:#66d9ef>options</span>.enabled<span style=color:#f92672>=</span><span style=color:#66d9ef>true</span>;
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#75715e>/*+ OPTIONS(&#39;streaming&#39;=&#39;true&#39;, &#39;monitor-interval&#39;=&#39;1s&#39;)*/</span> ;
</span></span><span style=display:flex><span>
</span></span><span style=display:flex><span><span style=color:#75715e>-- Read all incremental data starting from the snapshot-id &#39;3821550127947089987&#39; (records from this snapshot will be excluded).
</span></span></span><span style=display:flex><span><span style=color:#75715e></span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span> <span style=color:#75715e>/*+ OPTIONS(&#39;streaming&#39;=&#39;true&#39;, &#39;monitor-interval&#39;=&#39;1s&#39;, &#39;start-snapshot-id&#39;=&#39;3821550127947089987&#39;)*/</span> ;
</span></span></code></pre></div><p>SQL is also the recommended way to inspect tables. To view all of the snapshots in a table, use the snapshots metadata table:</p><div class=highlight><pre tabindex=0 style=color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4><code class=language-sql data-lang=sql><span style=display:flex><span><span style=color:#66d9ef>SELECT</span> <span style=color:#f92672>*</span> <span style=color:#66d9ef>FROM</span> <span style=color:#f92672>`</span>hive_catalog<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span><span style=color:#66d9ef>default</span><span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>sample<span style=color:#f92672>`</span>.<span style=color:#f92672>`</span>snapshots<span style=color:#f92672>`</span>
</span></span></code></pre></div><p>Iceberg support streaming or batch read in Java API:</p><pre tabindex=0><code>DataStream&lt;RowData&gt; batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false)
.build();
</code></pre><h2 id=type-conversion>Type conversion</h2><p>Iceberg&rsquo;s integration for Flink automatically converts between Flink and Iceberg types. When writing to a table with types that are not supported by Flink, like UUID, Iceberg will accept and convert values from the Flink type.</p><h3 id=flink-to-iceberg>Flink to Iceberg</h3><p>Flink types are converted to Iceberg types according to the following table:</p><table><thead><tr><th>Flink</th><th>Iceberg</th><th>Notes</th></tr></thead><tbody><tr><td>boolean</td><td>boolean</td><td></td></tr><tr><td>tinyint</td><td>integer</td><td></td></tr><tr><td>smallint</td><td>integer</td><td></td></tr><tr><td>integer</td><td>integer</td><td></td></tr><tr><td>bigint</td><td>long</td><td></td></tr><tr><td>float</td><td>float</td><td></td></tr><tr><td>double</td><td>double</td><td></td></tr><tr><td>char</td><td>string</td><td></td></tr><tr><td>varchar</td><td>string</td><td></td></tr><tr><td>string</td><td>string</td><td></td></tr><tr><td>binary</td><td>binary</td><td></td></tr><tr><td>varbinary</td><td>fixed</td><td></td></tr><tr><td>decimal</td><td>decimal</td><td></td></tr><tr><td>date</td><td>date</td><td></td></tr><tr><td>time</td><td>time</td><td></td></tr><tr><td>timestamp</td><td>timestamp without timezone</td><td></td></tr><tr><td>timestamp_ltz</td><td>timestamp with timezone</td><td></td></tr><tr><td>array</td><td>list</td><td></td></tr><tr><td>map</td><td>map</td><td></td></tr><tr><td>multiset</td><td>map</td><td></td></tr><tr><td>row</td><td>struct</td><td></td></tr><tr><td>raw</td><td></td><td>Not supported</td></tr><tr><td>interval</td><td></td><td>Not supported</td></tr><tr><td>structured</td><td></td><td>Not supported</td></tr><tr><td>timestamp with zone</td><td></td><td>Not supported</td></tr><tr><td>distinct</td><td></td><td>Not supported</td></tr><tr><td>null</td><td></td><td>Not supported</td></tr><tr><td>symbol</td><td></td><td>Not supported</td></tr><tr><td>logical</td><td></td><td>Not supported</td></tr></tbody></table><h3 id=iceberg-to-flink>Iceberg to Flink</h3><p>Iceberg types are converted to Flink types according to the following table:</p><table><thead><tr><th>Iceberg</th><th>Flink</th></tr></thead><tbody><tr><td>boolean</td><td>boolean</td></tr><tr><td>struct</td><td>row</td></tr><tr><td>list</td><td>array</td></tr><tr><td>map</td><td>map</td></tr><tr><td>integer</td><td>integer</td></tr><tr><td>long</td><td>bigint</td></tr><tr><td>float</td><td>float</td></tr><tr><td>double</td><td>double</td></tr><tr><td>date</td><td>date</td></tr><tr><td>time</td><td>time</td></tr><tr><td>timestamp without timezone</td><td>timestamp(6)</td></tr><tr><td>timestamp with timezone</td><td>timestamp_ltz(6)</td></tr><tr><td>string</td><td>varchar(2147483647)</td></tr><tr><td>uuid</td><td>binary(16)</td></tr><tr><td>fixed(N)</td><td>binary(N)</td></tr><tr><td>binary</td><td>varbinary(2147483647)</td></tr><tr><td>decimal(P, S)</td><td>decimal(P, S)</td></tr></tbody></table><h2 id=future-improvement>Future improvement.</h2><p>There are some features that are do not yet supported in the current Flink Iceberg integration work:</p><ul><li>Don&rsquo;t support creating iceberg table with hidden partitioning. <a href=http://mail-archives.apache.org/mod_mbox/flink-dev/202008.mbox/%3cCABi+2jQCo3MsOa4+ywaxV5J-Z8TGKNZDX-pQLYB-dG+dVUMiMw@mail.gmail.com%3e>Discussion</a> in flink mail list.</li><li>Don&rsquo;t support creating iceberg table with computed column.</li><li>Don&rsquo;t support creating iceberg table with watermark.</li><li>Don&rsquo;t support adding columns, removing columns, renaming columns, changing columns. <a href=https://issues.apache.org/jira/browse/FLINK-19062>FLINK-19062</a> is tracking this.</li><li></li></ul></div><div id=toc class=markdown-body><div id=full><nav id=TableOfContents><ul><li><a href=#preparation-when-using-flink-sql-client>Preparation when using Flink SQL Client</a></li><li><a href=#flinks-python-api>Flink&rsquo;s Python API</a></li><li><a href=#adding-catalogs>Adding catalogs.</a><ul><li><a href=#catalog-configuration>Catalog Configuration</a></li><li><a href=#hive-catalog>Hive catalog</a></li></ul></li><li><a href=#creating-a-table>Creating a table</a></li><li><a href=#writing>Writing</a><ul><li><a href=#branch-writes>Branch Writes</a></li></ul></li><li><a href=#reading>Reading</a></li><li><a href=#type-conversion>Type conversion</a><ul><li><a href=#flink-to-iceberg>Flink to Iceberg</a></li><li><a href=#iceberg-to-flink>Iceberg to Flink</a></li></ul></li><li><a href=#future-improvement>Future improvement.</a></li></ul></nav></div></div></div></div></section></body><script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/jquery-1.11.0.js></script>
<script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/jquery.easing.min.js></script>
<script type=text/javascript src=https://iceberg.apache.org/docs/fd-update-javadocs//js/search.js></script>
<script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/bootstrap.min.js></script>
<script src=https://iceberg.apache.org/docs/fd-update-javadocs//js/iceberg-theme.js></script></html>