blob: 636caea56c8f50f0ef799682065bcc96a12364a7 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Apache Flink Runner</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/runners/flink/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/runners/flink.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/runners/flink.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Runners</span></li><li><a href=/documentation/runners/capability-matrix/>Capability Matrix</a></li><li><a href=/documentation/runners/direct/>Direct Runner</a></li><li><a href=/documentation/runners/flink/>Apache Flink</a></li><li><a href=/documentation/runners/nemo/>Apache Nemo</a></li><li><a href=/documentation/runners/samza/>Apache Samza</a></li><li><a href=/documentation/runners/spark/>Apache Spark</a></li><li><a href=/documentation/runners/dataflow/>Google Cloud Dataflow</a></li><li><a href=/documentation/runners/jet/>Hazelcast Jet</a></li><li><a href=/documentation/runners/twister2/>Twister2</a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#prerequisites-and-setup>Prerequisites and Setup</a><ul><li><a href=#dependencies>Dependencies</a></li><li><a href=#executing-a-beam-pipeline-on-a-flink-cluster>Executing a Beam pipeline on a Flink Cluster</a></li></ul></li><li><a href=#additional-information-and-caveats>Additional information and caveats</a><ul><li><a href=#monitoring-your-job>Monitoring your job</a></li><li><a href=#streaming-execution>Streaming Execution</a></li></ul></li><li><a href=#pipeline-options-for-the-flink-runner>Pipeline options for the Flink Runner</a></li><li><a href=#flink-version-compatibility>Flink Version Compatibility</a></li><li><a href=#beam-capability>Beam Capability</a></li></ul></nav></nav><div class="body__contained body__section-nav"><h1 id=overview>Overview</h1><p>The Apache Flink Runner can be used to execute Beam pipelines using <a href=https://flink.apache.org>Apache
Flink</a>. For execution you can choose between a cluster
execution mode (e.g. Yarn/Kubernetes/Mesos) or a local embedded execution mode
which is useful for testing pipelines.</p><p>The Flink Runner and Flink are suitable for large scale, continuous jobs, and provide:</p><ul><li>A streaming-first runtime that supports both batch processing and data streaming programs</li><li>A runtime that supports very high throughput and low event latency at the same time</li><li>Fault-tolerance with <em>exactly-once</em> processing guarantees</li><li>Natural back-pressure in streaming programs</li><li>Custom memory management for efficient and robust switching between in-memory and out-of-core data processing algorithms</li><li>Integration with YARN and other components of the Apache Hadoop ecosystem</li></ul><h1 id=using-the-apache-flink-runner>Using the Apache Flink Runner</h1><p>It is important to understand that the Flink Runner comes in two flavors:</p><ol><li>The original <em>classic Runner</em> which supports only Java (and other JVM-based languages)</li><li>The newer <em>portable Runner</em> which supports Java/Python/Go</li></ol><p>You may ask why there are two Runners?</p><p>Beam and its Runners originally only supported JVM-based languages
(e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The
architecture of the Runners had to be changed significantly to support executing
pipelines written in other languages.</p><p>If your applications only use Java, then you should currently go with the classic
Runner. Eventually, the portable Runner will replace the classic Runner because
it contains the generalized framework for executing Java, Python, Go, and more
languages in the future.</p><p>If you want to run Python pipelines with Beam on Flink you want to use the
portable Runner. For more information on
portability, please visit the <a href=/roadmap/portability/>Portability page</a>.</p><p>Consequently, this guide is split into parts to document the classic and
the portable functionality of the Flink Runner.
In addition, Python provides convenience wrappers to handle the full lifecycle of the runner,
and so is further split depending on whether to manage the portability
components automatically (recommended) or manually.
Please use the switcher below to select the appropriate mode for the Runner:</p><nav class=language-switcher><strong>Adapt for:</strong><ul><li data-value=java>Classic (Java)</li><li data-value=py>Portable (Python)</li><li data-value=portable>Portable (Java/Python/Go)</li></ul></nav><h2 id=prerequisites-and-setup>Prerequisites and Setup</h2><p>If you want to use the local execution mode with the Flink Runner you don&rsquo;t have
to complete any cluster setup. You can simply run your Beam pipeline. Be sure to
set the Runner to <span class="language-java language-py"><code>FlinkRunner</code></span><span class=language-portable><code>PortableRunner</code></span>.</p><p>To use the Flink Runner for executing on a cluster, you have to setup a Flink cluster by following the
Flink <a href=https://ci.apache.org/projects/flink/flink-docs-stable/quickstart/setup_quickstart.html#setup-download-and-start-flink>Setup Quickstart</a>.</p><h3 id=dependencies>Dependencies</h3><p class=language-java>You must specify your dependency on the Flink Runner
in your <code>pom.xml</code> or <code>build.gradle</code>. Use the Beam version and the artifact id
from the <a href=#flink-version-compatibility>compatibility table</a> below. For example:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>&lt;</span><span class=n>dependency</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl> <span class=o>&lt;</span><span class=n>groupId</span><span class=o>&gt;</span><span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>beam</span><span class=o>&lt;/</span><span class=n>groupId</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl> <span class=o>&lt;</span><span class=n>artifactId</span><span class=o>&gt;</span><span class=n>beam</span><span class=o>-</span><span class=n>runners</span><span class=o>-</span><span class=n>flink</span><span class=o>-</span><span class=n>1</span><span class=o>.</span><span class=na>16</span><span class=o>&lt;/</span><span class=n>artifactId</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl> <span class=o>&lt;</span><span class=n>version</span><span class=o>&gt;</span><span class=n>2</span><span class=o>.</span><span class=na>56</span><span class=o>.</span><span class=na>0</span><span class=o>&lt;/</span><span class=n>version</span><span class=o>&gt;</span>
</span></span><span class=line><span class=cl><span class=o>&lt;/</span><span class=n>dependency</span><span class=o>&gt;</span></span></span></code></pre></div></div></div><p class=language-py>You will need Docker to be installed in your execution environment.
To run an embedded flink cluster or use the Flink runner for Python &lt; 3.6
you will also need to have java available in your execution environment.</p><p class=language-portable>You will need Docker to be installed in your execution environment.</p><h3 id=executing-a-beam-pipeline-on-a-flink-cluster>Executing a Beam pipeline on a Flink Cluster</h3><p class=language-java>For executing a pipeline on a Flink cluster you need to package your program
along with all dependencies in a so-called fat jar. How you do this depends on
your build system but if you follow along the <a href=/get-started/quickstart/>Beam Quickstart</a> this is the command that you have to run:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>$</span> <span class=n>mvn</span> <span class=kn>package</span> <span class=err>-</span><span class=nn>Pflink</span><span class=o>-</span><span class=n>runner</span></span></span></code></pre></div></div></div><p class=language-java>Look for the output JAR of this command in the
<code>target</code> folder.</p><p class=language-java>The Beam Quickstart Maven project is setup to use the Maven Shade plugin to
create a fat jar and the <code>-Pflink-runner</code> argument makes sure to include the
dependency on the Flink Runner.</p><p class=language-java>For running the pipeline the easiest option is to use the <code>flink</code> command which
is part of Flink:</p><p class=language-java>$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar
&ndash;runner=FlinkRunner &ndash;other-parameters</p><p class=language-java>Alternatively you can also use Maven&rsquo;s exec command. For example, to execute the
WordCount example:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>mvn</span> <span class=n>exec</span><span class=o>:</span><span class=n>java</span> <span class=o>-</span><span class=n>Dexec</span><span class=o>.</span><span class=na>mainClass</span><span class=o>=</span><span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>beam</span><span class=o>.</span><span class=na>examples</span><span class=o>.</span><span class=na>WordCount</span> <span class=err>\</span>
</span></span><span class=line><span class=cl> <span class=o>-</span><span class=n>Pflink</span><span class=o>-</span><span class=n>runner</span> <span class=err>\</span>
</span></span><span class=line><span class=cl> <span class=o>-</span><span class=n>Dexec</span><span class=o>.</span><span class=na>args</span><span class=o>=</span><span class=s>&#34;--runner=FlinkRunner \
</span></span></span><span class=line><span class=cl><span class=s> --inputFile=/path/to/pom.xml \
</span></span></span><span class=line><span class=cl><span class=s> --output=/path/to/counts \
</span></span></span><span class=line><span class=cl><span class=s> --flinkMaster=&lt;flink master url&gt; \
</span></span></span><span class=line><span class=cl><span class=s> --filesToStage=target/word-count-beam-bundled-0.1.jar&#34;</span></span></span></code></pre></div></div></div><p class=language-java>If you have a Flink <code>JobManager</code> running on your local machine you can provide <code>localhost:8081</code> for
<code>flinkMaster</code>. Otherwise an embedded Flink cluster will be started for the job.</p><p class=language-py>To run a pipeline on Flink, set the runner to <code>FlinkRunner</code>
and <code>flink_master</code> to the master URL of a Flink cluster.
In addition, optionally set <code>environment_type</code> set to <code>LOOPBACK</code>. For example,
after starting up a <a href=https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html>local flink cluster</a>,
one could run:</p><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=kn>import</span> <span class=nn>apache_beam</span> <span class=k>as</span> <span class=nn>beam</span>
</span></span><span class=line><span class=cl><span class=kn>from</span> <span class=nn>apache_beam.options.pipeline_options</span> <span class=kn>import</span> <span class=n>PipelineOptions</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptions</span><span class=p>([</span>
</span></span><span class=line><span class=cl> <span class=s2>&#34;--runner=FlinkRunner&#34;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s2>&#34;--flink_master=localhost:8081&#34;</span><span class=p>,</span>
</span></span><span class=line><span class=cl> <span class=s2>&#34;--environment_type=LOOPBACK&#34;</span>
</span></span><span class=line><span class=cl><span class=p>])</span>
</span></span><span class=line><span class=cl><span class=k>with</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=n>options</span><span class=p>)</span> <span class=k>as</span> <span class=n>p</span><span class=p>:</span>
</span></span><span class=line><span class=cl> <span class=o>...</span></span></span></code></pre></div></div></div><p class=language-py>To run on an embedded Flink cluster, simply omit the <code>flink_master</code> option
and an embedded Flink cluster will be automatically started and shut down for the job.</p><p class=language-py>The optional <code>flink_version</code> option may be required as well for older versions of Python.</p><p class=language-portable>Starting with Beam 2.18.0, pre-built Flink Job Service Docker images are available at Docker Hub:
<a href=https://hub.docker.com/r/apache/beam_flink1.14_job_server>Flink 1.14</a>.
<a href=https://hub.docker.com/r/apache/beam_flink1.15_job_server>Flink 1.15</a>.
<a href=https://hub.docker.com/r/apache/beam_flink1.16_job_server>Flink 1.16</a>.
<a href=https://hub.docker.com/r/apache/beam_flink1.17_job_server>Flink 1.17</a>.</p><p class=language-portable>To run a pipeline on an embedded Flink cluster:</p><p class=language-portable>(1) Start the JobService endpoint: <code>docker run --net=host apache/beam_flink1.10_job_server:latest</code></p><p class=language-portable>The JobService is the central instance where you submit your Beam pipeline to.
The JobService will create a Flink job for the pipeline and execute the job.</p><p class=language-portable>(2) Submit the Python pipeline to the above endpoint by using the <code>PortableRunner</code>, <code>job_endpoint</code> set to <code>localhost:8099</code> (this is the default address of the JobService).
Optionally set <code>environment_type</code> set to <code>LOOPBACK</code>. For example:</p><div class='language-portable snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-portable data-lang=portable>import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
&#34;--runner=PortableRunner&#34;,
&#34;--job_endpoint=localhost:8099&#34;,
&#34;--environment_type=LOOPBACK&#34;
])
with beam.Pipeline(options) as p:
...</code></pre></div></div><p class=language-portable>To run on a separate <a href=https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html>Flink cluster</a>:</p><p class=language-portable>(1) Start a Flink cluster which exposes the Rest interface (e.g. <code>localhost:8081</code> by default).</p><p class=language-portable>(2) Start JobService with Flink Rest endpoint: <code>docker run --net=host apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081</code>.</p><p class=language-portable>(3) Submit the pipeline as above.</p><div class='language-portable snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-portable data-lang=portable>import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
&#34;--runner=PortableRunner&#34;,
&#34;--job_endpoint=localhost:8099&#34;,
&#34;--environment_type=LOOPBACK&#34;
])
with beam.Pipeline(options=options) as p:
...</code></pre></div></div><p class="language-py language-portable">Note that <code>environment_type=LOOPBACK</code> is only intended for local testing,
and will not work on remote clusters.
See <a href=/documentation/runtime/sdk-harness-config/>here</a> for details.</p><h2 id=additional-information-and-caveats>Additional information and caveats</h2><h3 id=monitoring-your-job>Monitoring your job</h3><p>You can monitor a running Flink job using the Flink JobManager Dashboard or its Rest interfaces. By default, this is available at port <code>8081</code> of the JobManager node. If you have a Flink installation on your local machine that would be <code>http://localhost:8081</code>. Note: When you use the <code>[local]</code> mode an embedded Flink cluster will be started which does not make a dashboard available.</p><h3 id=streaming-execution>Streaming Execution</h3><p>If your pipeline uses an unbounded data source or sink, the Flink Runner will automatically switch to streaming mode. You can enforce streaming mode by using the <code>--streaming</code> flag.</p><p>Note: The Runner will print a warning message when unbounded sources are used and checkpointing is not enabled.
Many sources like <code>PubSubIO</code> rely on their checkpoints to be acknowledged which can only be done when checkpointing is enabled for the <code>FlinkRunner</code>. To enable checkpointing, please set <span class=language-java><code>checkpointingInterval</code></span><span class=language-py><code>checkpointing_interval</code></span> to the desired checkpointing interval in milliseconds.</p><h2 id=pipeline-options-for-the-flink-runner>Pipeline options for the Flink Runner</h2><p>When executing your pipeline with the Flink Runner, you can set these pipeline options.</p><p>The following list of Flink-specific pipeline options is generated automatically from the
<a href=https://beam.apache.org/releases/javadoc/2.56.0/index.html?org/apache/beam/runners/flink/FlinkPipelineOptions.html>FlinkPipelineOptions</a>
reference class:</p><div class=language-java><table class="table table-bordered"><tr><td><code>allowNonRestoredState</code></td><td>Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline.</td><td>Default: <code>false</code></td></tr><tr><td><code>attachedMode</code></td><td>Specifies if the pipeline is submitted in attached or detached mode</td><td>Default: <code>true</code></td></tr><tr><td><code>autoBalanceWriteFilesShardingEnabled</code></td><td>Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability.</td><td>Default: <code>false</code></td></tr><tr><td><code>autoWatermarkInterval</code></td><td>The interval in milliseconds for automatic watermark emission.</td><td></td></tr><tr><td><code>checkpointTimeoutMillis</code></td><td>The maximum time in milliseconds that a checkpoint may take before being discarded.</td><td>Default: <code>-1</code></td></tr><tr><td><code>checkpointingInterval</code></td><td>The interval in milliseconds at which to trigger checkpoints of the running pipeline. Default: No checkpointing.</td><td>Default: <code>-1</code></td></tr><tr><td><code>checkpointingMode</code></td><td>The checkpointing mode that defines consistency guarantee.</td><td>Default: <code>EXACTLY_ONCE</code></td></tr><tr><td><code>disableMetrics</code></td><td>Disable Beam metrics in Flink Runner</td><td>Default: <code>false</code></td></tr><tr><td><code>enableStableInputDrain</code></td><td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td><td>Default: <code>false</code></td></tr><tr><td><code>executionModeForBatch</code></td><td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td><td>Default: <code>PIPELINED</code></td></tr><tr><td><code>executionRetryDelay</code></td><td>Sets the delay in milliseconds between executions. A value of {@code -1} indicates that the default value should be used.</td><td>Default: <code>-1</code></td></tr><tr><td><code>externalizedCheckpointsEnabled</code></td><td>Enables or disables externalized checkpoints. Works in conjunction with CheckpointingInterval</td><td>Default: <code>false</code></td></tr><tr><td><code>failOnCheckpointingErrors</code></td><td>Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline the checkpoint and continue running.</td><td>Default: <code>true</code></td></tr><tr><td><code>fasterCopy</code></td><td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td><td>Default: <code>false</code></td></tr><tr><td><code>fileInputSplitMaxSizeMB</code></td><td>Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.</td><td>Default: <code>0</code></td></tr><tr><td><code>finishBundleBeforeCheckpointing</code></td><td>If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment.</td><td>Default: <code>false</code></td></tr><tr><td><code>flinkConfDir</code></td><td>Directory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.</td><td></td></tr><tr><td><code>flinkMaster</code></td><td>Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto].</td><td>Default: <code>[auto]</code></td></tr><tr><td><code>jobCheckIntervalInSecs</code></td><td>Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds</td><td>Default: <code>5</code></td></tr><tr><td><code>latencyTrackingInterval</code></td><td>Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value &lt;= 0 disables the feature.</td><td>Default: <code>0</code></td></tr><tr><td><code>maxBundleSize</code></td><td>The maximum number of elements in a bundle. Default values are 1000 for a streaming job and 1,000,000 for batch</td><td>Default: <code>MaxBundleSizeFactory</code></td></tr><tr><td><code>maxBundleTimeMills</code></td><td>The maximum time to wait before finalising a bundle (in milliseconds). Default values are 1000 for streaming and 10,000 for batch.</td><td>Default: <code>MaxBundleTimeFactory</code></td></tr><tr><td><code>maxParallelism</code></td><td>The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.</td><td>Default: <code>-1</code></td></tr><tr><td><code>minPauseBetweenCheckpoints</code></td><td>The minimal pause in milliseconds before the next checkpoint is triggered.</td><td>Default: <code>-1</code></td></tr><tr><td><code>numConcurrentCheckpoints</code></td><td>The maximum number of concurrent checkpoints. Defaults to 1 (=no concurrent checkpoints).</td><td>Default: <code>1</code></td></tr><tr><td><code>numberOfExecutionRetries</code></td><td>Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used.</td><td>Default: <code>-1</code></td></tr><tr><td><code>objectReuse</code></td><td>Sets the behavior of reusing objects.</td><td>Default: <code>false</code></td></tr><tr><td><code>operatorChaining</code></td><td>Sets the behavior of operator chaining.</td><td>Default: <code>true</code></td></tr><tr><td><code>parallelism</code></td><td>The degree of parallelism to be used when distributing operations onto workers. If the parallelism is not set, the configured Flink default is used, or 1 if none can be found.</td><td>Default: <code>-1</code></td></tr><tr><td><code>reIterableGroupByKeyResult</code></td><td>Flag indicating whether result of GBK needs to be re-iterable. Re-iterable result implies that all values for a single key must fit in memory as we currently do not support spilling to disk.</td><td>Default: <code>false</code></td></tr><tr><td><code>reportCheckpointDuration</code></td><td>If not null, reports the checkpoint duration of each ParDo stage in the provided metric namespace.</td><td></td></tr><tr><td><code>retainExternalizedCheckpointsOnCancellation</code></td><td>Sets the behavior of externalized checkpoints on cancellation.</td><td>Default: <code>false</code></td></tr><tr><td><code>savepointPath</code></td><td>Savepoint restore path. If specified, restores the streaming pipeline from the provided path.</td><td></td></tr><tr><td><code>shutdownSourcesAfterIdleMs</code></td><td>Shuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline shutdown (=Flink job finishes) once all input has been processed. Unless explicitly set, this will default to Long.MAX_VALUE when checkpointing is enabled and to 0 when checkpointing is disabled. See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue.</td><td>Default: <code>-1</code></td></tr><tr><td><code>stateBackend</code></td><td>State backend to store Beam's state. Use 'rocksdb' or 'filesystem'.</td><td></td></tr><tr><td><code>stateBackendFactory</code></td><td>Sets the state backend factory to use in streaming mode. Defaults to the flink cluster's state.backend configuration.</td><td></td></tr><tr><td><code>stateBackendStoragePath</code></td><td>State backend path to persist state backend data. Used to initialize state backend.</td><td></td></tr></table></div><div class=language-py><table class="table table-bordered"><tr><td><code>allow_non_restored_state</code></td><td>Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline.</td><td>Default: <code>false</code></td></tr><tr><td><code>attached_mode</code></td><td>Specifies if the pipeline is submitted in attached or detached mode</td><td>Default: <code>true</code></td></tr><tr><td><code>auto_balance_write_files_sharding_enabled</code></td><td>Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability.</td><td>Default: <code>false</code></td></tr><tr><td><code>auto_watermark_interval</code></td><td>The interval in milliseconds for automatic watermark emission.</td><td></td></tr><tr><td><code>checkpoint_timeout_millis</code></td><td>The maximum time in milliseconds that a checkpoint may take before being discarded.</td><td>Default: <code>-1</code></td></tr><tr><td><code>checkpointing_interval</code></td><td>The interval in milliseconds at which to trigger checkpoints of the running pipeline. Default: No checkpointing.</td><td>Default: <code>-1</code></td></tr><tr><td><code>checkpointing_mode</code></td><td>The checkpointing mode that defines consistency guarantee.</td><td>Default: <code>EXACTLY_ONCE</code></td></tr><tr><td><code>disable_metrics</code></td><td>Disable Beam metrics in Flink Runner</td><td>Default: <code>false</code></td></tr><tr><td><code>enable_stable_input_drain</code></td><td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td><td>Default: <code>false</code></td></tr><tr><td><code>execution_mode_for_batch</code></td><td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td><td>Default: <code>PIPELINED</code></td></tr><tr><td><code>execution_retry_delay</code></td><td>Sets the delay in milliseconds between executions. A value of {@code -1} indicates that the default value should be used.</td><td>Default: <code>-1</code></td></tr><tr><td><code>externalized_checkpoints_enabled</code></td><td>Enables or disables externalized checkpoints. Works in conjunction with CheckpointingInterval</td><td>Default: <code>false</code></td></tr><tr><td><code>fail_on_checkpointing_errors</code></td><td>Sets the expected behaviour for tasks in case that they encounter an error in their checkpointing procedure. If this is set to true, the task will fail on checkpointing error. If this is set to false, the task will only decline the checkpoint and continue running.</td><td>Default: <code>true</code></td></tr><tr><td><code>faster_copy</code></td><td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td><td>Default: <code>false</code></td></tr><tr><td><code>file_input_split_max_size_m_b</code></td><td>Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.</td><td>Default: <code>0</code></td></tr><tr><td><code>finish_bundle_before_checkpointing</code></td><td>If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment.</td><td>Default: <code>false</code></td></tr><tr><td><code>flink_conf_dir</code></td><td>Directory containing Flink YAML configuration files. These properties will be set to all jobs submitted to Flink and take precedence over configurations in FLINK_CONF_DIR.</td><td></td></tr><tr><td><code>flink_master</code></td><td>Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto].</td><td>Default: <code>[auto]</code></td></tr><tr><td><code>job_check_interval_in_secs</code></td><td>Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds</td><td>Default: <code>5</code></td></tr><tr><td><code>latency_tracking_interval</code></td><td>Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value &lt;= 0 disables the feature.</td><td>Default: <code>0</code></td></tr><tr><td><code>max_bundle_size</code></td><td>The maximum number of elements in a bundle. Default values are 1000 for a streaming job and 1,000,000 for batch</td><td>Default: <code>MaxBundleSizeFactory</code></td></tr><tr><td><code>max_bundle_time_mills</code></td><td>The maximum time to wait before finalising a bundle (in milliseconds). Default values are 1000 for streaming and 10,000 for batch.</td><td>Default: <code>MaxBundleTimeFactory</code></td></tr><tr><td><code>max_parallelism</code></td><td>The pipeline wide maximum degree of parallelism to be used. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.</td><td>Default: <code>-1</code></td></tr><tr><td><code>min_pause_between_checkpoints</code></td><td>The minimal pause in milliseconds before the next checkpoint is triggered.</td><td>Default: <code>-1</code></td></tr><tr><td><code>num_concurrent_checkpoints</code></td><td>The maximum number of concurrent checkpoints. Defaults to 1 (=no concurrent checkpoints).</td><td>Default: <code>1</code></td></tr><tr><td><code>number_of_execution_retries</code></td><td>Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used.</td><td>Default: <code>-1</code></td></tr><tr><td><code>object_reuse</code></td><td>Sets the behavior of reusing objects.</td><td>Default: <code>false</code></td></tr><tr><td><code>operator_chaining</code></td><td>Sets the behavior of operator chaining.</td><td>Default: <code>true</code></td></tr><tr><td><code>parallelism</code></td><td>The degree of parallelism to be used when distributing operations onto workers. If the parallelism is not set, the configured Flink default is used, or 1 if none can be found.</td><td>Default: <code>-1</code></td></tr><tr><td><code>re_iterable_group_by_key_result</code></td><td>Flag indicating whether result of GBK needs to be re-iterable. Re-iterable result implies that all values for a single key must fit in memory as we currently do not support spilling to disk.</td><td>Default: <code>false</code></td></tr><tr><td><code>report_checkpoint_duration</code></td><td>If not null, reports the checkpoint duration of each ParDo stage in the provided metric namespace.</td><td></td></tr><tr><td><code>retain_externalized_checkpoints_on_cancellation</code></td><td>Sets the behavior of externalized checkpoints on cancellation.</td><td>Default: <code>false</code></td></tr><tr><td><code>savepoint_path</code></td><td>Savepoint restore path. If specified, restores the streaming pipeline from the provided path.</td><td></td></tr><tr><td><code>shutdown_sources_after_idle_ms</code></td><td>Shuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline shutdown (=Flink job finishes) once all input has been processed. Unless explicitly set, this will default to Long.MAX_VALUE when checkpointing is enabled and to 0 when checkpointing is disabled. See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue.</td><td>Default: <code>-1</code></td></tr><tr><td><code>state_backend</code></td><td>State backend to store Beam's state. Use 'rocksdb' or 'filesystem'.</td><td></td></tr><tr><td><code>state_backend_factory</code></td><td>Sets the state backend factory to use in streaming mode. Defaults to the flink cluster's state.backend configuration.</td><td></td></tr><tr><td><code>state_backend_storage_path</code></td><td>State backend path to persist state backend data. Used to initialize state backend.</td><td></td></tr></table></div><p>For general Beam pipeline options see the
<a href=https://beam.apache.org/releases/javadoc/2.56.0/index.html?org/apache/beam/sdk/options/PipelineOptions.html>PipelineOptions</a>
reference.</p><h2 id=flink-version-compatibility>Flink Version Compatibility</h2><p>The Flink cluster version has to match the minor version used by the FlinkRunner.
The minor version is the first two numbers in the version string, e.g. in <code>1.16.0</code> the
minor version is <code>1.16</code>.</p><p>We try to track the latest version of Apache Flink at the time of the Beam release.
A Flink version is supported by Beam for the time it is supported by the Flink community.
The Flink community supports the last two minor versions. When support for a Flink version is dropped, it may be deprecated and removed also from Beam.
To find out which version of Flink is compatible with Beam please see the table below:</p><table class="table table-bordered"><tr><th>Flink Version</th><th>Artifact Id</th><th>Supported Beam Versions</th></tr><tr><td>1.17.x</td><td>beam-runners-flink-1.16</td><td>&ge; 2.56.0</td></tr><tr><td>1.16.x</td><td>beam-runners-flink-1.16</td><td>&ge; 2.47.0</td></tr><tr><td>1.15.x</td><td>beam-runners-flink-1.15</td><td>&ge; 2.40.0</td></tr><tr><td>1.14.x</td><td>beam-runners-flink-1.14</td><td>&ge; 2.38.0</td></tr><tr><td>1.13.x</td><td>beam-runners-flink-1.13</td><td>2.31.0 - 2.55.0</td></tr><tr><td>1.12.x</td><td>beam-runners-flink-1.12</td><td>2.27.0 - 2.55.0</td></tr><tr><td>1.11.x</td><td>beam-runners-flink-1.11</td><td>2.25.0 - 2.38.0</td></tr><tr><td>1.10.x</td><td>beam-runners-flink-1.10</td><td>2.21.0 - 2.30.0</td></tr><tr><td>1.9.x</td><td>beam-runners-flink-1.9</td><td>2.17.0 - 2.29.0</td></tr><tr><td>1.8.x</td><td>beam-runners-flink-1.8</td><td>2.13.0 - 2.29.0</td></tr><tr><td>1.7.x</td><td>beam-runners-flink-1.7</td><td>2.10.0 - 2.20.0</td></tr><tr><td>1.6.x</td><td>beam-runners-flink-1.6</td><td>2.10.0 - 2.16.0</td></tr><tr><td>1.5.x</td><td>beam-runners-flink_2.11</td><td>2.6.0 - 2.16.0</td></tr><tr><td>1.4.x with Scala 2.11</td><td>beam-runners-flink_2.11</td><td>2.3.0 - 2.5.0</td></tr><tr><td>1.3.x with Scala 2.10</td><td>beam-runners-flink_2.10</td><td>2.1.x - 2.2.0</td></tr><tr><td>1.2.x with Scala 2.10</td><td>beam-runners-flink_2.10</td><td>2.0.0</td></tr></table><p>For retrieving the right Flink version, see the <a href=https://flink.apache.org/downloads.html>Flink downloads page</a>.</p><p>For more information, the <a href=https://ci.apache.org/projects/flink/flink-docs-stable/>Flink Documentation</a> can be helpful.</p><h2 id=beam-capability>Beam Capability</h2><p>The <a href=/documentation/runners/capability-matrix/>Beam Capability Matrix</a> documents the
capabilities of the classic Flink Runner.</p><p>The <a href=https://s.apache.org/apache-beam-portability-support-table>Portable Capability
Matrix</a> documents
the capabilities of the portable Flink Runner.</p><div class=feedback><p class=update>Last updated on 2024/05/09</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>