blob: c4c1fe18e1481293f7c4736cd50ccad37847c3c1 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Runner Authoring Guide</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/contribute/runner-guide/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/contribute/runner-guide.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/contribute/runner-guide.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Contribute</span></li><li><a href=https://github.com/apache/beam/blob/master/CONTRIBUTING.md>Code contribution guide</a></li><li><a href=/contribute/get-help/>Get help</a></li><li><a href=/contribute/attributes/>Attributes of a Beam community member</a></li><li><span class=section-nav-list-title>Technical Docs</span><ul class=section-nav-list><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Contribution+Testing+Guide>Testing guide</a></li><li><a href=/contribute/precommit-triage-guide/>Pre-commit slowness triage</a></li><li><a href=/contribute/ptransform-style-guide/>PTransform style guide</a></li><li><a href=/contribute/runner-guide/>Runner authoring guide</a></li><li><a href=/contribute/dependencies/>Dependencies guide</a></li></ul></li><li><span class=section-nav-list-title>Policies</span><ul class=section-nav-list><li><a href=/contribute/issue-priorities/>Issue priorities</a></li><li><a href=/contribute/precommit-policies/>Pre-commit test policies</a></li><li><a href=/contribute/postcommits-policies/>Post-commit test policies</a></li><li><a href=/contribute/release-blockers/>Release blockers</a></li></ul></li><li><span class=section-nav-list-title>Committers</span><ul class=section-nav-list><li><a href=/contribute/become-a-committer/>Become a committer</a></li></ul></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#implementing-the-beam-primitives>Implementing the Beam Primitives</a><ul><li><a href=#what-if-you-havent-implemented-some-of-these-features>What if you haven&rsquo;t implemented some of these features?</a></li><li><a href=#implementing-the-impulse-primitive>Implementing the Impulse primitive</a></li><li><a href=#implementing-the-pardo-primitive>Implementing the ParDo primitive</a><ul><li><a href=#bundles>Bundles</a></li><li><a href=#the-dofn-lifecycle>The DoFn Lifecycle</a></li><li><a href=#side-inputs>Side Inputs</a></li><li><a href=#state-and-timers>State and Timers</a></li><li><a href=#splittable-dofn>Splittable DoFn</a></li></ul></li><li><a href=#implementing-the-groupbykey-and-window-primitive>Implementing the GroupByKey (and window) primitive</a><ul><li><a href=#group-by-encoded-bytes>Group By Encoded Bytes</a></li><li><a href=#window-merging>Window Merging</a></li><li><a href=#implementing-via-groupbykeyonly--groupalsobywindow>Implementing via GroupByKeyOnly + GroupAlsoByWindow</a></li><li><a href=#dropping-late-data>Dropping late data</a></li><li><a href=#triggering>Triggering</a></li><li><a href=#timestampcombiner>TimestampCombiner</a></li></ul></li><li><a href=#implementing-the-window-primitive>Implementing the Window primitive</a></li><li><a href=#implementing-the-flatten-primitive>Implementing the Flatten primitive</a></li><li><a href=#special-mention-the-combine-composite>Special mention: the Combine composite</a></li></ul></li><li><a href=#working-with-pipelines>Working with pipelines</a></li><li><a href=#testing-your-runner>Testing your runner</a></li><li><a href=#integrating-your-runner-nicely-with-sdks>Integrating your runner nicely with SDKs</a><ul><li><a href=#integrating-with-the-java-sdk>Integrating with the Java SDK</a><ul><li><a href=#allowing-users-to-pass-options-to-your-runner>Allowing users to pass options to your runner</a></li><li><a href=#registering-your-runner-with-sdks-for-command-line-use>Registering your runner with SDKs for command line use</a></li></ul></li><li><a href=#integrating-with-the-python-sdk>Integrating with the Python SDK</a></li></ul></li><li><a href=#writing-an-sdk-independent-runner>Writing an SDK-independent runner</a><ul><li><a href=#the-fn-api>The Fn API</a></li><li><a href=#the-runner-api>The Runner API</a></li></ul></li><li><a href=#the-runner-api-protos>The Runner API protos</a><ul><li><a href=#functionspec-proto><code>FunctionSpec</code> proto</a></li><li><a href=#primitive-transform-payload-protos>Primitive transform payload protos</a><ul><li><a href=#pardopayload-proto><code>ParDoPayload</code> proto</a></li><li><a href=#combinepayload-proto><code>CombinePayload</code> proto</a></li></ul></li><li><a href=#ptransform-proto><code>PTransform</code> proto</a></li><li><a href=#pcollection-proto><code>PCollection</code> proto</a></li><li><a href=#coder-proto><code>Coder</code> proto</a></li></ul></li><li><a href=#the-jobs-api-rpcs>The Jobs API RPCs</a></li></ul></nav></nav><div class="body__contained body__section-nav"><h1 id=runner-authoring-guide>Runner Authoring Guide</h1><p>This guide walks through how to implement a new runner. It is aimed at someone
who has a data processing system and wants to use it to execute a Beam
pipeline. The guide starts from the basics, to help you evaluate the work
ahead. Then the sections become more and more detailed, to be a resource
throughout the development of your runner.</p><p>Topics covered:</p><nav id=TableOfContents><ul><li><a href=#implementing-the-beam-primitives>Implementing the Beam Primitives</a><ul><li><a href=#what-if-you-havent-implemented-some-of-these-features>What if you haven&rsquo;t implemented some of these features?</a></li><li><a href=#implementing-the-impulse-primitive>Implementing the Impulse primitive</a></li><li><a href=#implementing-the-pardo-primitive>Implementing the ParDo primitive</a><ul><li><a href=#bundles>Bundles</a></li><li><a href=#the-dofn-lifecycle>The DoFn Lifecycle</a></li><li><a href=#side-inputs>Side Inputs</a></li><li><a href=#state-and-timers>State and Timers</a></li><li><a href=#splittable-dofn>Splittable DoFn</a></li></ul></li><li><a href=#implementing-the-groupbykey-and-window-primitive>Implementing the GroupByKey (and window) primitive</a><ul><li><a href=#group-by-encoded-bytes>Group By Encoded Bytes</a></li><li><a href=#window-merging>Window Merging</a></li><li><a href=#implementing-via-groupbykeyonly--groupalsobywindow>Implementing via GroupByKeyOnly + GroupAlsoByWindow</a></li><li><a href=#dropping-late-data>Dropping late data</a></li><li><a href=#triggering>Triggering</a></li><li><a href=#timestampcombiner>TimestampCombiner</a></li></ul></li><li><a href=#implementing-the-window-primitive>Implementing the Window primitive</a></li><li><a href=#implementing-the-flatten-primitive>Implementing the Flatten primitive</a></li><li><a href=#special-mention-the-combine-composite>Special mention: the Combine composite</a></li></ul></li><li><a href=#working-with-pipelines>Working with pipelines</a></li><li><a href=#testing-your-runner>Testing your runner</a></li><li><a href=#integrating-your-runner-nicely-with-sdks>Integrating your runner nicely with SDKs</a><ul><li><a href=#integrating-with-the-java-sdk>Integrating with the Java SDK</a><ul><li><a href=#allowing-users-to-pass-options-to-your-runner>Allowing users to pass options to your runner</a></li><li><a href=#registering-your-runner-with-sdks-for-command-line-use>Registering your runner with SDKs for command line use</a></li></ul></li><li><a href=#integrating-with-the-python-sdk>Integrating with the Python SDK</a></li></ul></li><li><a href=#writing-an-sdk-independent-runner>Writing an SDK-independent runner</a><ul><li><a href=#the-fn-api>The Fn API</a></li><li><a href=#the-runner-api>The Runner API</a></li></ul></li><li><a href=#the-runner-api-protos>The Runner API protos</a><ul><li><a href=#functionspec-proto><code>FunctionSpec</code> proto</a></li><li><a href=#primitive-transform-payload-protos>Primitive transform payload protos</a><ul><li><a href=#pardopayload-proto><code>ParDoPayload</code> proto</a></li><li><a href=#combinepayload-proto><code>CombinePayload</code> proto</a></li></ul></li><li><a href=#ptransform-proto><code>PTransform</code> proto</a></li><li><a href=#pcollection-proto><code>PCollection</code> proto</a></li><li><a href=#coder-proto><code>Coder</code> proto</a></li></ul></li><li><a href=#the-jobs-api-rpcs>The Jobs API RPCs</a></li></ul></nav><h2 id=implementing-the-beam-primitives>Implementing the Beam Primitives</h2><p>Aside from encoding and persisting data - which presumably your engine already
does in some way or another - most of what you need to do is implement the Beam
primitives. This section provides a detailed look at each primitive, covering
what you need to know that might not be obvious and what support code is
provided.</p><p>The primitives are designed for the benefit of pipeline authors, not runner
authors. Each represents a different conceptual mode of operation (external IO,
element-wise, grouping, windowing, union) rather than a specific implementation
decision. The same primitive may require a very different implementation based
on how the user instantiates it. For example, a <code>ParDo</code> that uses state or
timers may require key partitioning, a <code>GroupByKey</code> with speculative triggering
may require a more costly or complex implementation.</p><h3 id=what-if-you-havent-implemented-some-of-these-features>What if you haven&rsquo;t implemented some of these features?</h3><p>That&rsquo;s OK! You don&rsquo;t have to do it all at once, and there may even be features
that don&rsquo;t make sense for your runner to ever support. We maintain a
<a href=/documentation/runners/capability-matrix/>capability matrix</a> on the Beam site so you can tell
users what you support. When you receive a <code>Pipeline</code>, you should traverse it
and determine whether or not you can execute each <code>DoFn</code> that you find. If
you cannot execute some <code>DoFn</code> in the pipeline (or if there is any other
requirement that your runner lacks) you should reject the pipeline. In your
native environment, this may look like throwing an
<code>UnsupportedOperationException</code>. The Runner API RPCs will make this explicit,
for cross-language portability.</p><h3 id=implementing-the-impulse-primitive>Implementing the Impulse primitive</h3><p><code>Impulse</code> is a PTransform that takes no inputs and produces exactly one output
during the lifetime of the pipeline which should be the empty bytes in the
global window with the minimum timestamp. This has the encoded value of
<code>7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00</code> when encoded with the standard
windowed value coder.</p><p>Though <code>Impulse</code> is generally not invoked by a user, it is the only root
primitive operation, and other root operations (like <code>Read</code>s and <code>Create</code>)
are composite operations constructed from an <code>Impulse</code> followed by a series
of (possibly Splittable) <code>ParDo</code>s.</p><h3 id=implementing-the-pardo-primitive>Implementing the ParDo primitive</h3><p>The <code>ParDo</code> primitive describes element-wise transformation for a
<code>PCollection</code>. <code>ParDo</code> is the most complex primitive, because it is where any
per-element processing is described. In addition to very simple operations like
standard <code>map</code> or <code>flatMap</code> from functional programming, <code>ParDo</code> also supports
multiple outputs, side inputs, initialization, flushing, teardown, and stateful
processing.</p><p>The UDF that is applied to each element is called a <code>DoFn</code>. The exact APIs for
a <code>DoFn</code> can vary per language/SDK but generally follow the same pattern, so we
can discuss it with pseudocode. I will also often refer to the Java support
code, since I know it and most of our current and future runners are
Java-based.</p><p>Generally, rather than applying a series of <code>ParDo</code>s one at a time over the
entire input data set, it is more efficient to fuse several <code>ParDo</code>s together
in a single executable stage that consists of a whole series (in general,
a DAG) of mapping operations. In addition to <code>ParDo</code>s, windowing operations,
local (pre- or post-GBK) combining operations, and other mapping operations
may be fused into these stages as well.</p><p>As DoFns may execute code in a different language, or requiring a different
environment, than the runner itself, Beam provides the ability to call these
in a cross-process way. This is the crux of the
<a href=https://beam.apache.org/contribute/runner-guide/#writing-an-sdk-independent-runner>Beam Fn API</a>,
for which more detail can be found below.
It is, however, perfectly acceptable for a runner to invoke this user code
in process (for simplicity or efficiency) when the environments are
compatible.</p><h4 id=bundles>Bundles</h4><p>For correctness, a <code>DoFn</code> <em>should</em> represent an element-wise function, but in
most SDKS this is a long-lived object that processes elements in small groups
called bundles.</p><p>Your runner decides how many elements, and which elements, to include in a
bundle, and can even decide dynamically in the middle of processing that the
current bundle has &ldquo;ended&rdquo;. How a bundle is processed ties in with the rest of
a DoFn&rsquo;s lifecycle.</p><p>It will generally improve throughput to make the largest bundles possible, so
that initialization and finalization costs are amortized over many elements.
But if your data is arriving as a stream, then you will want to terminate a
bundle in order to achieve appropriate latency, so bundles may be just a few
elements.</p><p>A bundle is the unit of commitment in Beam. If an error is encountered while
processing a bundle, all the prior outputs of that bundle (including any
modifications to state or timers) must be discarded by the runner and the
entire bundle retried. Upon successful completion of a bundle, its outputs,
together with any state/timer modifications and watermark updates, must be
committed atomically.</p><h4 id=the-dofn-lifecycle>The DoFn Lifecycle</h4><p><code>DoFns</code> in many SDKS have several methods such as <code>setup</code>, <code>start_bundle</code>,
<code>finish_bundle</code>, <code>teardown</code>, etc. in addition to the standard,
element-wise <code>process</code> calls. Generally proper invocation of
<a href=https://beam.apache.org/documentation/programming-guide/#dofn>this lifecycle</a>
should be handled for you when invoking one or more
<code>DoFn</code>s from the standard bundle processors (either via the FnAPI or directly
using a BundleProcessor
(<a href=https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java>java</a>
(<a href=https://github.com/apache/beam/blob/release-2.49.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L852>python</a>)).
SDK-independent runners should never have to worry about these details directly.</p><h4 id=side-inputs>Side Inputs</h4><p><em>Main design document:
<a href=https://s.apache.org/beam-side-inputs-1-pager>https://s.apache.org/beam-side-inputs-1-pager</a></em></p><p>A side input is a global view of a window of a <code>PCollection</code>. This distinguishes
it from the main input, which is processed one element at a time. The SDK/user
prepares a <code>PCollection</code> adequately, the runner materializes it, and then the
runner feeds it to the <code>DoFn</code>.</p><p>Unlike main input data, which is <em>pushed</em> by the runner to the <code>ParDo</code> (generally
via the FnApi Data channel), side input data is <em>pulled</em> by the <code>ParDo</code>
from the runner (generally over the FnAPI State channel).</p><p>A side input is accessed via a specific <code>access_pattern</code>.
There are currently two access patterns enumerated in the
<code>StandardSideInputTypes</code> proto: <code>beam:side_input:iterable:v1</code> which indicates
the runner must return all values in a PCollection corresponding to a specific
window and <code>beam:side_input:multimap:v1</code> which indicates the runner must return
all values corresponding to a specific key and window.
Being able to serve these access patterns efficiently may influence how a
runner materializes this PCollection.</p><p>SideInputs can be detected by looking at the <code>side_inputs</code> map in the
<code>ParDoPayload</code> of <code>ParDo</code> transforms.
The <code>ParDo</code> operation itself is responsible for invoking the
<code>window_mapping_fn</code> (before invoking the runner) and <code>view_fn</code> (on the
runner-returned values), so the runner need not concern itself with these
fields.</p><p>When a side input is needed but the side input has no data associated with it
for a given window, elements in that window must be deferred until the side
input has some data or the watermark has advances sufficiently such that
we can be sure there will be no data for that window. The
<a href=https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java><code>PushBackSideInputDoFnRunner</code></a>
is an example of implementing this.</p><h4 id=state-and-timers>State and Timers</h4><p><em>Main design document: <a href=https://s.apache.org/beam-state>https://s.apache.org/beam-state</a></em></p><p>When a <code>ParDo</code> includes state and timers, its execution on your runner is usually
very different. In particular, the state must be persisted when the bundle
completes and retrieved for future bundles. Timers that are set must also be
injected into future bundles as the watermark advances sufficiently.</p><p>State and timers are partitioned per key and window, that is, a <code>DoFn</code>
processing a given key must have a consistent view of the state and timers
across all elements that share this key. You may need or want to
explicitly shuffle data to support this.
Once the watermark has passed the end of the window (plus an allowance for
allowed lateness, if any), state associated with this window can be dropped.</p><p>State setting and retrieval is performed on the FnAPI State channel, whereas
timer setting and firing happens on the FnAPI Data channel.</p><h4 id=splittable-dofn>Splittable DoFn</h4><p><em>Main design document: <a href=https://s.apache.org/splittable-do-fn>https://s.apache.org/splittable-do-fn</a></em></p><p>Splittable <code>DoFn</code> is a generalization of <code>ParDo</code> that is useful for high-fanout
mappings that can be done in parallel. The prototypical example of such an
operation is reading from a file, where a single file name (as an input element)
can be mapped to all the elements contained in that file.
The <code>DoFn</code> is considered splittable in the sense that an element representing,
say, a single file can be split (e.g. into ranges of that file) to be processed
(e.g. read) by different workers.
The full power of this primitive is in the fact that these splits can happen
dynamically rather than just statically (i.e. ahead of time) avoiding the
problem of over- or undersplitting.</p><p>A full explanation of Splittable <code>DoFn</code> is out of scope for this doc, but
here is a brief overview as it pertains to its execution.</p><p>A Splittable <code>DoFn</code> can participate in the dynamic splitting protocol by
splitting within an element as well as between elements. Dynamic splitting
is triggered by the runner issuing <code>ProcessBundleSplitRequest</code> messages on
the control channel. The SDK will commit to process just a portion of the
indicated element and return a description of the remainder (i.e. the
unprocessed portion) to the runner in the <code>ProcessBundleSplitResponse</code>
to be scheduled by the runner (e.g. on a different worker or as part of a
different bundle).</p><p>A Splittable <code>DoFn</code> can also initiate its own spitting, indicating it has
processed an element as far as it can for the moment (e.g. when tailing a file)
but more remains. These most often occur when reading unbounded sources.
In this case a set of elements representing the deferred work are passed back
in the <code>residual_roots</code> field of the <code>ProcessBundleResponse</code>.
At a future time, the runner must re-invoke these same operations with
the elements given in <code>residual_roots</code>.</p><h3 id=implementing-the-groupbykey-and-window-primitive>Implementing the GroupByKey (and window) primitive</h3><p>The <code>GroupByKey</code> operation (sometimes called GBK for short) groups a
<code>PCollection</code> of key-value pairs by key and window, emitting results according
to the <code>PCollection</code>&rsquo;s triggering configuration.</p><p>It is quite a bit more elaborate than simply colocating elements with the same
key, and uses many fields from the <code>PCollection</code>&rsquo;s windowing strategy.</p><h4 id=group-by-encoded-bytes>Group By Encoded Bytes</h4><p>For both the key and window, your runner sees them as &ldquo;just bytes&rdquo;. So you need
to group in a way that is consistent with grouping by those bytes, even if you
have some special knowledge of the types involved.</p><p>The elements you are processing will be key-value pairs, and you&rsquo;ll need to extract
the keys. For this reason, the format of key-value pairs is
<a href=https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L838>standardized and shared</a>
across all SDKS. See either
<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/KvCoder.html><code>KvCoder</code></a>
in Java or
<a href=https://beam.apache.org/releases/pydoc/current/apache_beam.coders.coders.html#apache_beam.coders.coders.TupleCoder><code>TupleCoder</code></a>
in Python for documentation on the binary format.</p><h4 id=window-merging>Window Merging</h4><p>As well as grouping by key, your runner must group elements by their window. A
<code>WindowFn</code> has the option of declaring that it merges windows on a per-key
basis. For example, session windows for the same key will be merged if they
overlap. So your runner must invoke the merge method of the <code>WindowFn</code> during
grouping.</p><h4 id=implementing-via-groupbykeyonly--groupalsobywindow>Implementing via GroupByKeyOnly + GroupAlsoByWindow</h4><p>The Java and Python codebases includes support code for a particularly common way of
implementing the full <code>GroupByKey</code> operation: first group the keys, and then group
by window. For merging windows, this is essentially required, since merging is
per key.</p><p>Often presenting the set of values in timestamp order can allow more
efficient grouping of these values into their final windows.</p><h4 id=dropping-late-data>Dropping late data</h4><p><em>Main design document:
<a href=https://s.apache.org/beam-lateness>https://s.apache.org/beam-lateness</a></em></p><p>A window is expired in a <code>PCollection</code> if the watermark of the input PCollection
has exceeded the end of the window by at least the input <code>PCollection</code>&rsquo;s
allowed lateness.</p><p>Data for an expired window can be dropped any time and should be dropped at a
<code>GroupByKey</code>. If you are using <code>GroupAlsoByWindow</code>, then just before executing
this transform. You may shuffle less data if you drop data prior to
<code>GroupByKeyOnly</code>, but should only safely be done for non-merging windows, as a
window that appears expired may merge to become not expired.</p><h4 id=triggering>Triggering</h4><p><em>Main design document:
<a href=https://s.apache.org/beam-triggers>https://s.apache.org/beam-triggers</a></em></p><p>The input <code>PCollection</code>&rsquo;s trigger and accumulation mode specify when and how
outputs should be emitted from the <code>GroupByKey</code> operation.</p><p>In Java, there is a lot of support code for executing triggers in the
<code>GroupAlsoByWindow</code> implementations, <code>ReduceFnRunner</code> (legacy name), and
<code>TriggerStateMachine</code>, which is an obvious way of implementing all triggers as
an event-driven machine over elements and timers.
In Python this is supported by the
<a href=https://github.com/apache/beam/blob/release-2.49.0/sdks/python/apache_beam/transforms/trigger.py#L1199>TriggerDriver</a> classes.</p><h4 id=timestampcombiner>TimestampCombiner</h4><p>When an aggregated output is produced from multiple inputs, the <code>GroupByKey</code>
operation has to choose a timestamp for the combination. To do so, first the
WindowFn has a chance to shift timestamps - this is needed to ensure watermarks
do not prevent progress of windows like sliding windows (the details are beyond
this doc). Then, the shifted timestamps need to be combined - this is specified
by a <code>TimestampCombiner</code>, which can either select the minimum or maximum of its
inputs, or just ignore inputs and choose the end of the window.</p><h3 id=implementing-the-window-primitive>Implementing the Window primitive</h3><p>The window primitive applies a <code>WindowFn</code> UDF to place each input element into
one or more windows of its output PCollection. Note that the primitive also
generally configures other aspects of the windowing strategy for a <code>PCollection</code>,
but the fully constructed graph that your runner receives will already have a
complete windowing strategy for each <code>PCollection</code>.</p><p>To implement this primitive, you need to invoke the provided WindowFn on each
element, which will return some set of windows for that element to be a part of
in the output <code>PCollection</code>.</p><p>Most runners implement this by fusing these window-altering mappings in with
the <code>DoFns</code>.</p><p><strong>Implementation considerations</strong></p><p>A &ldquo;window&rdquo; is just a second grouping key that has a &ldquo;maximum timestamp&rdquo;. It can
be any arbitrary user-defined type. The <code>WindowFn</code> provides the coder for the
window type.</p><p>Beam&rsquo;s support code provides <code>WindowedValue</code> which is a compressed
representation of an element in multiple windows. You may want to do use this,
or your own compressed representation. Remember that it simply represents
multiple elements at the same time; there is no such thing as an element &ldquo;in
multiple windows&rdquo;.</p><p>For values in the global window, you may want to use an even further compressed
representation that doesn&rsquo;t bother including the window at all.</p><p>We provide coders with these optimizations such as
<a href=https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L968><code>PARAM_WINDOWED_VALUE</code></a>
that can be used to reduce the size of serialized data.</p><p>In the future, this primitive may be retired as it can be implemented as a
ParDo if the capabilities of ParDo are enhanced to allow output to new windows.</p><h3 id=implementing-the-flatten-primitive>Implementing the Flatten primitive</h3><p>This one is easy - take as input a finite set of <code>PCollections</code> and outputs their
bag union, keeping windows intact.</p><p>For this operation to make sense, it is the SDK&rsquo;s responsibility to make sure
the windowing strategies are compatible.</p><p>Also note that there is no requirement that the coders for all the <code>PCollections</code>
be the same. If your runner wants to require that (to avoid tedious
re-encoding) you have to enforce it yourself. Or you could just implement the
fast path as an optimization.</p><h3 id=special-mention-the-combine-composite>Special mention: the Combine composite</h3><p>A composite transform that is almost always treated specially by a runner is
<code>CombinePerKey</code>, which applies an associative and commutative operator to
the elements of a <code>PCollection</code>. This composite is not a primitive. It is
implemented in terms of <code>ParDo</code> and <code>GroupByKey</code>, so your runner will work
without treating it - but it does carry additional information that you
probably want to use for optimizations: the associative-commutative operator,
known as a <code>CombineFn</code>.</p><p>Generally runners will want to implement this via what is called
combiner lifting, where a new operation is placed before the <code>GroupByKey</code>
that does partial (within-bundle) combining, which often requires a slight
modification of what comes after the <code>GroupByKey</code> as well.
An example of this transformation can be found in the
<a href=https://github.com/apache/beam/blob/release-2.49.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1193>Python</a>
or <a href=https://github.com/apache/beam/blob/release-2.49.0/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go#L67>go</a>
implementations of this optimization.
The resulting pre- and post-<code>GroupByKey</code> operations are generally fused in with
the <code>ParDo</code>s and executed as above.</p><h2 id=working-with-pipelines>Working with pipelines</h2><p>When you receive a pipeline from a user, you will need to translate it.
An explanation of how Beam pipelines are represented can be found
<a href=https://docs.google.com/presentation/d/1atu-QC_mnK2SaeLhc0D78wZYgVOX1fN0H544QmBi3VA>here</a>
which compliment the <a href=https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto>official proto declarations</a>.</p><h2 id=testing-your-runner>Testing your runner</h2><p>The Beam Java SDK and Python SDK have suites of runner validation tests. The
configuration may evolve faster than this document, so check the configuration
of other Beam runners. But be aware that we have tests and you can use them
very easily! To enable these tests in a Java-based runner using Gradle, you
scan the dependencies of the SDK for tests with the JUnit category
<code>ValidatesRunner</code>.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>task validatesRunner(type: Test) {
group = &#34;Verification&#34;
description = &#34;Validates the runner&#34;
def pipelineOptions = JsonOutput.toJson([&#34;--runner=MyRunner&#34;, ... misc test options ...])
systemProperty &#34;beamTestPipelineOptions&#34;, pipelineOptions
classpath = configurations.validatesRunner
testClassesDirs = files(project(&#34;:sdks:java:core&#34;).sourceSets.test.output.classesDirs)
useJUnit {
includeCategories &#39;org.apache.beam.sdk.testing.ValidatesRunner&#39;
}
}</code></pre></div></div><p>Enabling these tests in other languages is unexplored.</p><h2 id=integrating-your-runner-nicely-with-sdks>Integrating your runner nicely with SDKs</h2><p>Whether or not your runner is based in the same language as an SDK (such as
Java), you will want to provide a shim to invoke it from another SDK if you
want the users of that SDK (such as Python) to use it.</p><h3 id=integrating-with-the-java-sdk>Integrating with the Java SDK</h3><h4 id=allowing-users-to-pass-options-to-your-runner>Allowing users to pass options to your runner</h4><p>The mechanism for configuration is
<a href=https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/options/PipelineOptions.html><code>PipelineOptions</code></a>,
an interface that works completely differently than normal Java objects. Forget
what you know, and follow the rules, and <code>PipelineOptions</code> will treat you well.</p><p>You must implement a sub-interface for your runner with getters and setters
with matching names, like so:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>public interface MyRunnerOptions extends PipelineOptions {
@Description(&#34;The Foo to use with MyRunner&#34;)
@Required
public Foo getMyRequiredFoo();
public void setMyRequiredFoo(Foo newValue);
@Description(&#34;Enable Baz; on by default&#34;)
@Default.Boolean(true)
public Boolean isBazEnabled();
public void setBazEnabled(Boolean newValue);
}</code></pre></div></div><p>You can set up defaults, etc. See the javadoc for details. When your runner is
instantiated with a <code>PipelineOptions</code> object, you access your interface by
<code>options.as(MyRunnerOptions.class)</code>.</p><p>To make these options available on the command line, you register your options
with a <code>PipelineOptionsRegistrar</code>. It is easy if you use <code>@AutoService</code>:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>@AutoService(PipelineOptionsRegistrar.class)
public static class MyOptionsRegistrar implements PipelineOptionsRegistrar {
@Override
public Iterable&lt;Class&lt;? extends PipelineOptions&gt;&gt; getPipelineOptions() {
return ImmutableList.&lt;Class&lt;? extends PipelineOptions&gt;&gt;of(MyRunnerOptions.class);
}
}</code></pre></div></div><h4 id=registering-your-runner-with-sdks-for-command-line-use>Registering your runner with SDKs for command line use</h4><p>To make your runner available on the command line, you register your options
with a <code>PipelineRunnerRegistrar</code>. It is easy if you use <code>@AutoService</code>:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>@AutoService(PipelineRunnerRegistrar.class)
public static class MyRunnerRegistrar implements PipelineRunnerRegistrar {
@Override
public Iterable&lt;Class&lt;? extends PipelineRunner&gt;&gt; getPipelineRunners() {
return ImmutableList.&lt;Class&lt;? extends PipelineRunner&gt;&gt;of(MyRunner.class);
}
}</code></pre></div></div><h3 id=integrating-with-the-python-sdk>Integrating with the Python SDK</h3><p>In the Python SDK the registration of the code is not automatic. So there are
few things to keep in mind when creating a new runner.</p><p>Any dependencies on packages for the new runner should be options so create a
new target in <code>extra_requires</code> in <code>setup.py</code> that is needed for the new runner.</p><p>All runner code should go in it&rsquo;s own package in <code>apache_beam/runners</code> directory.</p><p>Register the new runner in the <code>create_runner</code> function of <code>runner.py</code> so that the
partial name is matched with the correct class to be used.</p><p>Python Runners can also be identified (e.g. when passing the runner parameter)
by their fully qualified name whether or not they live in the Beam repository.</p><h2 id=writing-an-sdk-independent-runner>Writing an SDK-independent runner</h2><p>There are two aspects to making your runner SDK-independent, able to run
pipelines written in other languages: The Fn API and the Runner API.</p><h3 id=the-fn-api>The Fn API</h3><p><em>Design documents:</em></p><ul><li><p><em><a href=https://s.apache.org/beam-fn-api>https://s.apache.org/beam-fn-api</a></em></p></li><li><p><em><a href=https://s.apache.org/beam-fn-api-processing-a-bundle>https://s.apache.org/beam-fn-api-processing-a-bundle</a></em></p></li><li><p><em><a href=https://s.apache.org/beam-fn-api-send-and-receive-data>https://s.apache.org/beam-fn-api-send-and-receive-data</a></em></p></li><li><p><em><a href="https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_317">Overview</a></em></p></li><li><p><em><a href=https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto>Spec</a></em></p></li></ul><p>To run a user&rsquo;s pipeline, you need to be able to invoke their UDFs. The Fn API
is an RPC interface for the standard UDFs of Beam, implemented using protocol
buffers over gRPC.</p><p>The Fn API includes:</p><ul><li>APIs for registering a subgraph of UDFs</li><li>APIs for streaming elements of a bundle</li><li>Shared data formats (key-value pairs, timestamps, iterables, etc)</li></ul><p>You are fully welcome to <em>also</em> use the SDK for your language for utility code,
or provide optimized implementations of bundle processing for same-language
UDFs.</p><h3 id=the-runner-api>The Runner API</h3><p>The <a href="https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_3736">Runner API</a>
is an SDK-independent schema for a pipeline along with RPC
interfaces for launching a pipeline and checking the status of a job.
By examining a pipeline only through Runner API
interfaces, you remove your runner&rsquo;s dependence on the SDK for its language for
pipeline analysis and job translation.</p><p>To execute such an SDK-independent pipeline, you will need to support the Fn
API. UDFs are embedded in the pipeline as a specification of the function
(often just opaque serialized bytes for a particular language) plus a
specification of an environment that can execute it (essentially a particular
SDK). So far, this specification is expected to be a URI for a Docker container
hosting the SDK&rsquo;s Fn API harness.</p><p>You are fully welcome to <em>also</em> use the SDK for your language, which may offer
useful utility code.</p><p>The language-independent definition of a pipeline is described via a protocol
buffers schema, covered below for reference. But your runner <em>need not</em>
directly manipulate protobuf messages. Instead, the Beam codebase provides
utilities for working with pipelines so that you don&rsquo;t need to be aware of
whether or not the pipeline has ever been serialized or transmitted, or what
language it may have been written in to begin with.</p><p><strong>Java</strong></p><p>If your runner is Java-based, the tools to interact with pipelines in an
SDK-agnostic manner are in the <code>beam-runners-core-construction-java</code>
artifact, in the <code>org.apache.beam.sdk.util.construction</code> namespace.
The utilities are named consistently, like so:</p><ul><li><code>PTransformTranslation</code> - registry of known transforms and standard URNs</li><li><code>ParDoTranslation</code> - utilities for working with <code>ParDo</code> in a
language-independent manner</li><li><code>WindowIntoTranslation</code> - same for <code>Window</code></li><li><code>FlattenTranslation</code> - same for <code>Flatten</code></li><li><code>WindowingStrategyTranslation</code> - same for windowing strategies</li><li><code>CoderTranslation</code> - same for coders</li><li>&mldr; etc, etc &mldr;</li></ul><p>By inspecting transforms only through these classes, your runner will not
depend on the particulars of the Java SDK.</p><h2 id=the-runner-api-protos>The Runner API protos</h2><p>The <a href=https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto>Runner
API</a>
refers to a specific manifestation of the concepts in the Beam model, as a
protocol buffers schema. Even though you should not manipulate these messages
directly, it can be helpful to know the canonical data that makes up a
pipeline.</p><p>Most of the API is exactly the same as the high-level description; you can get
started implementing a runner without understanding all the low-level details.</p><p>The most important takeaway of the Runner API for you is that it is a
language-independent definition of a Beam pipeline. You will probably always
interact via a particular SDK&rsquo;s support code wrapping these definitions with
sensible idiomatic APIs, but always be aware that this is the specification and
any other data is not necessarily inherent to the pipeline, but may be
SDK-specific enrichments (or bugs!).</p><p>The UDFs in the pipeline may be written for any Beam SDK, or even multiple in
the same pipeline. So this is where we will start, taking a bottom-up approach
to understanding the protocol buffers definitions for UDFs before going back to
the higher-level, mostly obvious, record definitions.</p><h3 id=functionspec-proto><code>FunctionSpec</code> proto</h3><p>The heart of cross-language portability is the <code>FunctionSpec</code>. This is a
language-independent specification of a function, in the usual programming
sense that includes side effects, etc.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>message FunctionSpec {
string urn;
bytes payload;
}</code></pre></div></div><p>A <code>FunctionSpec</code> includes a URN identifying the function as well as an arbitrary
fixed parameter. For example the (hypothetical) &ldquo;max&rdquo; CombineFn might have the
URN <code>beam:combinefn:max:0.1</code> and a parameter that indicates by what
comparison to take the max.</p><p>For most UDFs in a pipeline constructed using a particular language&rsquo;s SDK, the
URN will indicate that the SDK must interpret it, for example
<code>beam:dofn:javasdk:0.1</code> or <code>beam:dofn:pythonsdk:0.1</code>. The parameter
will contain serialized code, such as a Java-serialized <code>DoFn</code> or a Python
pickled <code>DoFn</code>.</p><p>A <code>FunctionSpec</code> is not only for UDFs. It is just a generic way to name/specify
any function. It is also used as the specification for a <code>PTransform</code>. But when
used in a <code>PTransform</code> it describes a function from <code>PCollection</code> to <code>PCollection</code>
and cannot be specific to an SDK because the runner is in charge of evaluating
transforms and producing <code>PCollections</code>.</p><p>It goes without saying that not every environment will be able to deserialize
every function spec. For this reason <code>PTransform</code>s have an <code>environment_id</code>
parameter that indicates at least one environment that is capable of interpreting
the contained URNs. This is a reference to an environment in the environments
map of the Pipeline proto and is typically defined by a docker image (possibly
with some extra dependencies).
There may be other environments that are also capable of
doing so, and a runner is free to use them if it has this knowledge.</p><h3 id=primitive-transform-payload-protos>Primitive transform payload protos</h3><p>The payload for the primitive transforms are just proto serializations of their
specifications. Rather than reproduce their full code here, I will just
highlight the important pieces to show how they fit together.</p><p>It is worth emphasizing again that while you probably will not interact
directly with these payloads, they are the only data that is inherently part of
the transform.</p><h4 id=pardopayload-proto><code>ParDoPayload</code> proto</h4><p>A <code>ParDo</code> transform carries its <code>DoFn</code> in an <code>SdkFunctionSpec</code> and then
provides language-independent specifications for its other features - side
inputs, state declarations, timer declarations, etc.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>message ParDoPayload {
FunctionSpec do_fn;
map&lt;string, SideInput&gt; side_inputs;
map&lt;string, StateSpec&gt; state_specs;
map&lt;string, TimerSpec&gt; timer_specs;
...
}</code></pre></div></div><h4 id=combinepayload-proto><code>CombinePayload</code> proto</h4><p><code>Combine</code> is not a primitive. But non-primitives are perfectly able to carry
additional information for better optimization. The most important thing that a
<code>Combine</code> transform carries is the <code>CombineFn</code> in an <code>SdkFunctionSpec</code> record.
In order to effectively carry out the optimizations desired, it is also
necessary to know the coder for intermediate accumulations, so it also carries
a reference to this coder.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>message CombinePayload {
FunctionSpec combine_fn;
string accumulator_coder_id;
...
}</code></pre></div></div><h3 id=ptransform-proto><code>PTransform</code> proto</h3><p>A <code>PTransform</code> is a function from <code>PCollection</code> to <code>PCollection</code>. This is
represented in the proto using a FunctionSpec.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>message PTransform {
FunctionSpec spec;
repeated string subtransforms;
// Maps from local string names to PCollection ids
map&lt;string, bytes&gt; inputs;
map&lt;string, bytes&gt; outputs;
...
}</code></pre></div></div><p>A <code>PTransform</code> may have subtransforms if it is a composite, in which case the
<code>FunctionSpec</code> may be omitted since the subtransforms define its behavior.</p><p>The input and output <code>PCollections</code> are unordered and referred to by a local
name. The SDK decides what this name is, since it will likely be embedded in
serialized UDFs.</p><p>A runner that understands the specification of a given <code>PTransform</code> (whether
primitive or composite), as defined by its <code>FunctionSpec</code>, is free to
substitute it with another <code>PTransform</code> (or set thereof) that has identical
semantics.
This is typically how <code>CombinePerKey</code> is handled, but many other substitutions
can be done as well.</p><h3 id=pcollection-proto><code>PCollection</code> proto</h3><p>A <code>PCollection</code> just stores a coder, windowing strategy, and whether or not it
is bounded.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>message PCollection {
string coder_id;
IsBounded is_bounded;
string windowing_strategy_id;
...
}</code></pre></div></div><h3 id=coder-proto><code>Coder</code> proto</h3><p>This is a very interesting proto. A coder is a parameterized function that may
only be understood by a particular SDK, hence an <code>FunctionSpec</code>, but also
may have component coders that fully define it. For example, a <code>ListCoder</code> is
only a meta-format, while <code>ListCoder(VarIntCoder)</code> is a fully specified format.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>message Coder {
FunctionSpec spec;
repeated string component_coder_ids;
}</code></pre></div></div><p>There are a large number of
<a href=https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L829>standard coders</a>
understood by most, if not all,
SDKs. Using these allows for cross-language transforms.</p><h2 id=the-jobs-api-rpcs>The Jobs API RPCs</h2><p><a href="https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_3722">Overview</a>
<a href=https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto>Spec</a></p><p>While your language&rsquo;s SDK will may insulate you from touching the Runner
API protos directly, you may need to implement adapters for your runner, to
expose it to another language.
This allows a Python SDK to invoke a Java runner or vice versa.
A typical implementation of this can be found in
<a href=https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/local_job_service.py>local_job_service.py</a>
which is used directly to front several Python-implemented runners.</p><p>The RPCs themselves will necessarily follow the existing APIs of PipelineRunner
and PipelineResult, but altered to be the minimal backend channel, versus a
rich and convenient API.</p><p>A key piece of this is the
<a href=https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_artifact_api.proto>Artifacts API</a>,
which allows a Runner to fetch and deploy binary artifacts (such as jars,
pypi packages, etc.) that are listed as dependencies in the various environments,
and may have various representations. This is invoked after a pipeline
is submitted, but before it is executed. The SDK submitting a pipeline acts
as an artifact server to the runner receiving the request, and in turn the
runner then acts as an artifact server to the workers (environments) hosting
the users UDFs.</p><div class=feedback><p class=update>Last updated on 2024/05/10</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>