blob: 7eed2d203a739c92f26b7a7a9d81856dd29aeb96 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Orchestration</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/ml/orchestration/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/ml/orchestration.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/ml/orchestration.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#understanding-the-beam-dag>Understanding the Beam DAG</a></li><li><a href=#orchestrating-frameworks>Orchestrating frameworks</a></li><li><a href=#preprocessing-example>Preprocessing example</a><ul><li><a href=#kubeflow-pipelines-kfp>Kubeflow pipelines (KFP)</a><ul><li><a href=#create-the-kfp-components>Create the KFP components</a></li><li><a href=#create-the-pipeline-definition>Create the pipeline definition</a></li><li><a href=#run-the-kfp-pipeline>Run the KFP pipeline</a></li></ul></li><li><a href=#tensorflow-extended-tfx>Tensorflow Extended (TFX)</a><ul><li><a href=#preprocess>Preprocess</a></li><li><a href=#train>Train</a></li><li><a href=#executing-the-pipeline>Executing the pipeline</a></li></ul></li></ul></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=workflow-orchestration>Workflow orchestration</h1><p>This page provides KFP and TFX orchestration examples. It first provides the KFP example, and then it demonstrates how TFX manages functionality that is defined by hand when using KFP.</p><h2 id=understanding-the-beam-dag>Understanding the Beam DAG</h2><p>Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. A concept central to the Apache Beam programming model is the Directed Acyclic Graph (DAG). Each Apache Beam pipeline is a DAG that you can construct through the Beam SDK in your programming language of choice (from the set of supported Apache Beam SDKs). Each node of this DAG represents a processing step (<code>PTransform</code>) that accepts a collection of data as input (<code>PCollection</code>) and then outputs a transformed collection of data (<code>PCollection</code>). The edges define how data flows through the pipeline from one processing step to another. The following diagram shows an example pipeline workflow.</p><p><img src=/images/standalone-beam-pipeline.svg alt="A standalone beam pipeline"></p><p>Defining a pipeline and the corresponding DAG does not mean that data starts flowing through the pipeline. To run the pipeline, you need to deploy it to one of the <a href=/documentation/runners/capability-matrix/>supported Beam runners</a>. These distributed processing backends include Apache Flink, Apache Spark, and Google Cloud Dataflow. To run the pipeline locally on your machine for development and debugging purposes, a <a href=/documentation/runners/direct/>Direct Runner</a> is also provided. View the <a href=/documentation/runners/capability-matrix/>runner capability matrix</a> to verify that your chosen runner supports the data processing steps defined in your pipeline, especially when using the Direct Runner.</p><h2 id=orchestrating-frameworks>Orchestrating frameworks</h2><p>Successfully delivering machine learning projects requires more than training a model. A full ML workflow often contains a range of other steps, including data ingestion, data validation, data preprocessing, model evaluation, model deployment, data drift detection, and so on. Furthermore, you need to track metadata and artifacts from your experiments to answer important questions, such as:</p><ul><li>What data was this model trained on and with which training parameters?</li><li>When was this model deployed and what accuracy did it have on a test dataset?
Without this knowledge, troubleshooting, monitoring, and improving your ML solutions becomes increaseingly difficult when your solutions grow in size.</li></ul><p>The solution: MLOps. MLOps is an umbrella term used to describe best practices and guiding principles that aim to make the development and maintenance of machine learning systems seamless and efficient. MLOps most often entails automating machine learning workflows throughout the model and data lifecycle. Popular frameworks to create these workflow DAGs are <a href=https://www.kubeflow.org/docs/components/pipelines/introduction/>Kubeflow Pipelines</a>, <a href=https://airflow.apache.org/docs/apache-airflow/stable/index.html>Apache Airflow</a>, and <a href=https://www.tensorflow.org/tfx/guide>TFX</a>.</p><p>You can either use an Apache Beam pipeline as a standalone data processing job, or you can make it part of a larger sequence of steps in a workflow. In the latter case, the Apache Beam DAG is one node in the overarching DAG composed by the workflow orchestrator. This workflow thus contains a DAG withinin a DAG, as illustrated in the following diagram.</p><p><img src=/images/orchestrated-beam-pipeline.svg alt="An Apache Beam pipeline as part of a larger orchestrated workflow"></p><p>The key difference between the Apache Beam DAG and the orchestrating DAG is that the Apache Beam DAG processes data and passes that data between the nodes of its DAG, whereas the orchestration DAG schedules and monitors steps in the workflow and passes execution parameters, metadata, and artifacts between the nodes of the DAG.</p><ul><li>Apache Beam focuses on parallelization and enabling both batch and streaming jobs.</li><li>Examples of orchestration DAG artifacts include trained models and datasets. Such artifacts are often passed by a reference URI and not by value.</li></ul><p>Note: TFX creates a workflow DAG, which needs an orchestrator of its own to run. <a href=https://www.tensorflow.org/tfx/guide/custom_orchestrator>Natively supported orchestrators for TFX</a> are Airflow, Kubeflow Pipelines, and Apache Beam itself. As mentioned in the <a href=https://www.tensorflow.org/tfx/guide/beam_orchestrator>TFX docs</a>:</p><blockquote><p>&ldquo;Several TFX components rely on Beam for distributed data processing. In addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG. Beam orchestrator uses a different BeamRunner than the one which is used for component data processing.&rdquo;</p></blockquote><p>Caveat: The Beam orchestrator is not meant to be a TFX orchestrator used in production environments. It simply enables debugging TFX pipelines locally on Beam’s Direct Runner without the need for the extra setup required for Airflow or Kubeflow.</p><h2 id=preprocessing-example>Preprocessing example</h2><p>This section describes two orchestrated ML workflows, one with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two frameworks both create workflows but have their own distinct advantages and disadvantages:</p><ul><li>KFP requires you to create your workflow components from scratch, and requires a user to explicitly indicate which artifacts should be passed between components and in what way.</li><li>TFX offers a number of prebuilt components and takes care of the artifact passing more implicitly.
When choosing between the two frameworks, you need to consider the trade-off between flexibility and programming overhead.</li></ul><p>For simplicity, the workflows only contain three components: data ingestion, data preprocessing, and model training. Depending on the scenario, you can add a range of extra components, such as model evaluation and model deployment. This example focuses on the preprocessing component, because it demonstrates how to use Apache Beam in an ML workflow for efficient and parallel processing of your ML data.</p><p>The dataset consists of images paired with a textual caption describing the content of the image. These pairs are taken from a captions subset of the <a href=https://cocodataset.org/#home>MSCOCO 2014 dataset</a>. This multi-modal data (image and text) gives us the opportunity to experiment with preprocessing operations for both modalities.</p><h3 id=kubeflow-pipelines-kfp>Kubeflow pipelines (KFP)</h3><p>In order to run our ML workflow with KFP we must perform three steps:</p><ol><li>Create the KFP components by specifying the interface to the components and by writing and containerizing the implementation of the component logic.</li><li>Create the KFP pipeline by connecting the created components, specifying how inputs and outputs should be passed between components, and compiling the pipeline definition to a full pipeline definition.</li><li>Run the KFP pipeline by submitting it to a KFP client endpoint.</li></ol><p>The full example code can be found in the <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/kfp>GitHub repository</a></p><h4 id=create-the-kfp-components>Create the KFP components</h4><p>The following diagram shows our target file structure:</p><pre><code> kfp
├── pipeline.py
├── components
│ ├── ingestion
│ │ ├── Dockerfile
│ │ ├── component.yaml
│ │ ├── requirements.txt
│ │ └── src
│ │ └── ingest.py
│ ├── preprocessing
│ │ ├── Dockerfile
│ │ ├── component.yaml
│ │ ├── requirements.txt
│ │ └── src
│ │ └── preprocess.py
│ └── train
│ ├── Dockerfile
│ ├── component.yaml
│ ├── requirements.txt
│ └── src
│ └── train.py
└── requirements.txt
</code></pre><p>The full preprocessing component specification is shown in the folllowing illustration. The inputs are the path where the ingested dataset was saved by the ingest component and a path to a directory where the component can store artifacts. Additionally, some inputs specify how and where the Apache Beam pipeline runs. The specifications for the ingestion and train components are similar and can be found in the <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/component.yaml>ingestion component.yaml</a> file and in the <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/component.yaml>train component.yaml</a> file, respectively.</p><blockquote><p>Note: we are using the KFP v1 SDK, because v2 is still in <a href=https://www.kubeflow.org/docs/started/support/#application-status>beta</a>. The v2 SDK introduces some new options for specifying the component interface with more native support for input and output artifacts. To see how to migrate components from v1 to v2, consult the <a href=https://www.kubeflow.org/docs/components/pipelines/sdk-v2/v2-component-io/>KFP docs</a>.</p></blockquote><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>name: preprocessing
description: Component that mimicks scraping data from the web and outputs it to a jsonlines format file
inputs:
- name: ingested_dataset_path
description: source uri of the data to scrape
type: String
- name: base_artifact_path
description: base path to store data
type: String
- name: gcp_project_id
description: ID for the google cloud project to deploy the pipeline to.
type: String
- name: region
description: Region in which to deploy the Dataflow pipeline.
type: String
- name: dataflow_staging_root
description: Path to staging directory for the dataflow runner.
type: String
- name: beam_runner
description: Beam runner, DataflowRunner or DirectRunner.
type: String
outputs:
- name: preprocessed_dataset_path
description: target uri for the ingested dataset
type: String
implementation:
container:
image: &lt;your-docker-registry/preprocessing-image-name:latest&gt;
command: [
python3,
preprocess.py,
--ingested-dataset-path,
{inputValue: ingested_dataset_path},
--base-artifact-path,
{inputValue: base_artifact_path},
--preprocessed-dataset-path,
{outputPath: preprocessed_dataset_path},
--gcp-project-id,
{inputValue: gcp_project_id},
--region,
{inputValue: region},
--dataflow-staging-root,
{inputValue: dataflow_staging_root},
--beam-runner,
{inputValue: beam_runner},
]</code></pre></div></div><p>In this case, each component shares an identical Dockerfile, but you can add extra component-specific dependencies where needed.</p><div class='language-Dockerfile snippet'><div class="notebook-skip code-snippet"><a target=_blank type=button data-bs-toggle=tooltip data-bs-placement=bottom title="View source code" href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile><img src=/images/code-icon.svg></a>
<a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-Dockerfile data-lang=Dockerfile><span class=line><span class=cl><span class=k>FROM</span><span class=s> python:3.9-slim</span><span class=err>
</span></span></span><span class=line><span class=cl><span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=c># (Optional) install extra dependencies</span><span class=err>
</span></span></span><span class=line><span class=cl><span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=c># install pypi dependencies</span><span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=k>COPY</span> requirements.txt /<span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=k>RUN</span> python3 -m pip install --no-cache-dir -r requirements.txt<span class=err>
</span></span></span><span class=line><span class=cl><span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=c># copy src files and set working directory</span><span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=k>COPY</span> src /src<span class=err>
</span></span></span><span class=line><span class=cl><span class=err></span><span class=k>WORKDIR</span><span class=s> /src</span></span></span></code></pre></div></div></div><p>With the component specification and containerization done, implement the preprocessing component.</p><p>Because KFP provides the input and output arguments as command-line arguments, an <code>argumentparser</code> is needed.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def parse_args():
&#34;&#34;&#34;Parse preprocessing arguments.&#34;&#34;&#34;
parser = argparse.ArgumentParser()
parser.add_argument(
&#34;--ingested-dataset-path&#34;,
type=str,
help=&#34;Path to the ingested dataset&#34;,
required=True)
parser.add_argument(
&#34;--preprocessed-dataset-path&#34;,
type=str,
help=&#34;The target directory for the ingested dataset.&#34;,
required=True)
parser.add_argument(
&#34;--base-artifact-path&#34;,
type=str,
help=&#34;Base path to store pipeline artifacts.&#34;,
required=True)
parser.add_argument(
&#34;--gcp-project-id&#34;,
type=str,
help=&#34;ID for the google cloud project to deploy the pipeline to.&#34;,
required=True)
parser.add_argument(
&#34;--region&#34;,
type=str,
help=&#34;Region in which to deploy the pipeline.&#34;,
required=True)
parser.add_argument(
&#34;--dataflow-staging-root&#34;,
type=str,
help=&#34;Path to staging directory for dataflow.&#34;,
required=True)
parser.add_argument(
&#34;--beam-runner&#34;,
type=str,
help=&#34;Beam runner: DataflowRunner or DirectRunner.&#34;,
default=&#34;DirectRunner&#34;)
return parser.parse_args()</code></pre></div></div><p>The implementation of the <code>preprocess_dataset</code> function contains the Apache Beam pipeline code and the Beam pipeline options that select the runner. The executed preprocessing involves downloading the image bytes from their URL, converting them to a Torch Tensor, and resizing to the desired size. The caption undergoes a series of string manipulations to ensure that our model receives uniform image descriptions. Tokenization is not done here, but could be included here if the vocabulary is known. Finally, each element is serialized and written to <a href=https://avro.apache.org/docs/>Avro</a> files. You can use alternative files formats, such as TFRecords.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code># We use the save_main_session option because one or more DoFn&#39;s in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(
runner=beam_runner,
project=gcp_project_id,
job_name=f&#39;preprocessing-{int(time.time())}&#39;,
temp_location=dataflow_staging_root,
region=region,
requirements_file=&#34;/requirements.txt&#34;,
save_main_session=True,
)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| &#34;Read input jsonlines file&#34; &gt;&gt;
beam.io.ReadFromText(ingested_dataset_path)
| &#34;Load json&#34; &gt;&gt; beam.Map(json.loads)
| &#34;Filter licenses&#34; &gt;&gt; beam.Filter(valid_license)
| &#34;Download image from URL&#34; &gt;&gt; beam.FlatMap(download_image_from_url)
| &#34;Resize image&#34; &gt;&gt; beam.Map(resize_image, size=IMAGE_SIZE)
| &#34;Clean Text&#34; &gt;&gt; beam.Map(clean_text)
| &#34;Serialize Example&#34; &gt;&gt; beam.Map(serialize_example)
| &#34;Write to Avro files&#34; &gt;&gt; beam.io.WriteToAvro(
file_path_prefix=target_path,
schema={
&#34;namespace&#34;: &#34;preprocessing.example&#34;,
&#34;type&#34;: &#34;record&#34;,
&#34;name&#34;: &#34;Sample&#34;,
&#34;fields&#34;: [{
&#34;name&#34;: &#34;id&#34;, &#34;type&#34;: &#34;int&#34;
}, {
&#34;name&#34;: &#34;caption&#34;, &#34;type&#34;: &#34;string&#34;
}, {
&#34;name&#34;: &#34;image&#34;, &#34;type&#34;: &#34;bytes&#34;
}]
},
file_name_suffix=&#34;.avro&#34;))</code></pre></div></div><p>It also contains the necessary code to perform the component I/O. First, a target path is constructed to store the preprocessed dataset based on the component input parameter <code>base_artifact_path</code> and a timestamp. Output values from components are only returned as files, so we write the value of the constructed target path to an output file that was provided to our component by KFP.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>timestamp = time.time()
target_path = f&#34;{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}&#34;
# the directory where the output file is created may or may not exists
# so we have to create it.
Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True)
with open(preprocessed_dataset_path, &#39;w&#39;) as f:
f.write(target_path)</code></pre></div></div><p>Because we are mainly interested in the preprocessing component to show how a Beam pipeline can be integrated into a larger ML workflow, this section doesn&rsquo;t cover the implementation of the ingestion and train components in depth. Implementations of dummy components that mock their behavior are provided in the full example code.</p><h4 id=create-the-pipeline-definition>Create the pipeline definition</h4><p><code>pipeline.py</code> first loads the created components from their specification <code>.yaml</code> file.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code># load the kfp components from their yaml files
DataIngestOp = comp.load_component(&#39;components/ingestion/component.yaml&#39;)
DataPreprocessingOp = comp.load_component(
&#39;components/preprocessing/component.yaml&#39;)
TrainModelOp = comp.load_component(&#39;components/train/component.yaml&#39;)</code></pre></div></div><p>After that, the pipeline is created, and the required component inputs and outputs are specified manually.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>@dsl.pipeline(
pipeline_root=PIPELINE_ROOT,
name=&#34;beam-preprocessing-kfp-example&#34;,
description=&#34;Pipeline to show an apache beam preprocessing example in KFP&#34;)
def pipeline(
gcp_project_id: str,
region: str,
component_artifact_root: str,
dataflow_staging_root: str,
beam_runner: str):
&#34;&#34;&#34;KFP pipeline definition.
Args:
gcp_project_id (str): ID for the google cloud project to deploy the pipeline to.
region (str): Region in which to deploy the pipeline.
component_artifact_root (str): Path to artifact repository where Kubeflow Pipelines
components can store artifacts.
dataflow_staging_root (str): Path to staging directory for the dataflow runner.
beam_runner (str): Beam runner: DataflowRunner or DirectRunner.
&#34;&#34;&#34;
ingest_data_task = DataIngestOp(base_artifact_path=component_artifact_root)
data_preprocessing_task = DataPreprocessingOp(
ingested_dataset_path=ingest_data_task.outputs[&#34;ingested_dataset_path&#34;],
base_artifact_path=component_artifact_root,
gcp_project_id=gcp_project_id,
region=region,
dataflow_staging_root=dataflow_staging_root,
beam_runner=beam_runner)
train_model_task = TrainModelOp(
preprocessed_dataset_path=data_preprocessing_task.
outputs[&#34;preprocessed_dataset_path&#34;],
base_artifact_path=component_artifact_root)</code></pre></div></div><p>Finally, the defined pipeline is compiled, and a <code>pipeline.json</code> specification file is generated.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>Compiler().compile(pipeline_func=pipeline, package_path=&#34;pipeline.json&#34;)</code></pre></div></div><h4 id=run-the-kfp-pipeline>Run the KFP pipeline</h4><p>Using the following specification file and snippet with the necessary <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/kfp/requirements.txt>requirements</a> installed, you can now run the pipeline. Consult the <a href=https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html#kfp.Client.run_pipeline><code>run_pipeline</code> documentation</a> for more information. Before running the pipeline, a container for each component must be built and pushed to a container registry that your pipeline can access. Also, the component specification <code>.yaml</code> files must point to the correct container image.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>client = kfp.Client()
experiment = client.create_experiment(&#34;KFP orchestration example&#34;)
run_result = client.run_pipeline(
experiment_id=experiment.id,
job_name=&#34;KFP orchestration job&#34;,
pipeline_package_path=&#34;pipeline.json&#34;,
params=run_arguments)</code></pre></div></div><h3 id=tensorflow-extended-tfx>Tensorflow Extended (TFX)</h3><p>Working with TFX is similar to the approach for KFP illustrated previously: Define the individual workflow components, connect them in a pipeline object, and run the pipeline in the target environment. What makes TFX different is that it has already built a set of Python packages that are libraries to create workflow components. Unlike with the KFP example, you don&rsquo;t need to start from scratch by writing and containerizing the code.</p><p>With TFX, you need to choose which TFX components are relevant to your workflow and use the library to adapt their functionality to your use case. The following diagram shows the available components and their corresponding libraries.</p><p><img src=https://www.tensorflow.org/static/tfx/guide/images/libraries_components.png alt="TFX libraries and components"></p><p>TFX relies heavily on Apache Beam to implement data-parallel pipelines in these libraries. You need to run components created with these libraries with one of the supported Apache Beam runners. The full TFX example code can again be found in the <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/tfx>GitHub repository</a>.</p><p>For the KFP example, we used ingestion, preprocessing, and trainer components. In this TFX example, we use the ExampleGen, Transform, and Trainer libraries.</p><p>First, review the pipeline definition. Note that this definition looks similar to our previous example.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def create_pipeline(
gcp_project_id,
region,
pipeline_name,
pipeline_root,
csv_file,
module_file,
beam_runner,
metadata_file):
&#34;&#34;&#34;Create the TFX pipeline.
Args:
gcp_project_id (str): ID for the google cloud project to deploy the pipeline to.
region (str): Region in which to deploy the pipeline.
pipeline_name (str): Name for the Beam pipeline
pipeline_root (str): Path to artifact repository where TFX
stores a pipeline’s artifacts.
csv_file (str): Path to the csv input file.
module_file (str): Path to module file containing the preprocessing_fn and run_fn.
beam_runner (str): Beam runner: DataflowRunner or DirectRunner.
metadata_file (str): Path to store a metadata file as a mock metadata database.
&#34;&#34;&#34;
example_gen = tfx.components.CsvExampleGen(input_base=csv_file)
# Computes statistics over data for visualization and example validation.
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs[&#39;examples&#39;])
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs[&#39;statistics&#39;], infer_feature_shape=True)
transform = tfx.components.Transform(
examples=example_gen.outputs[&#39;examples&#39;],
schema=schema_gen.outputs[&#39;schema&#39;],
module_file=module_file)
trainer = tfx.components.Trainer(
module_file=module_file,
examples=transform.outputs[&#39;transformed_examples&#39;],
transform_graph=transform.outputs[&#39;transform_graph&#39;])
components = [example_gen, statistics_gen, schema_gen, transform, trainer]
beam_pipeline_args_by_runner = {
&#39;DirectRunner&#39;: [],
&#39;DataflowRunner&#39;: [
&#39;--runner=DataflowRunner&#39;,
&#39;--project=&#39; + gcp_project_id,
&#39;--temp_location=&#39; + os.path.join(pipeline_root, &#39;tmp&#39;),
&#39;--region=&#39; + region,
]
}
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
enable_cache=True,
metadata_connection_config=tfx.orchestration.metadata.
sqlite_metadata_connection_config(metadata_file),
beam_pipeline_args=beam_pipeline_args_by_runner[beam_runner])</code></pre></div></div><p>We use the same data input, that is, a couple of image-captions pairs extracted from the <a href=https://cocodataset.org/#home>MSCOCO 2014 dataset</a>. This time, however, we use the data in CSV format, because the ExampleGen component does not by default support jsonlines. The formats that are supported out of the box are listed in the <a href=https://www.tensorflow.org/tfx/guide/examplegen#data_sources_and_formats>Data Sources and Formats</a> page in the TensorFlow documentation. Alternatively, you can write a <a href=https://www.tensorflow.org/tfx/guide/examplegen#custom_examplegen>custom ExampleGen</a>.</p><p>Copy the snippet below to an input data CSV file:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>image_id,id,caption,image_url,image_name,image_license
318556,255,&#34;An angled view of a beautifully decorated bathroom.&#34;,&#34;http://farm4.staticflickr.com/3133/3378902101_3c9fa16b84_z.jpg&#34;,&#34;COCO_train2014_000000318556.jpg&#34;,&#34;Attribution-NonCommercial-ShareAlike License&#34;
476220,14,&#34;An empty kitchen with white and black appliances.&#34;,&#34;http://farm7.staticflickr.com/6173/6207941582_b69380c020_z.jpg&#34;,&#34;COCO_train2014_000000476220.jpg&#34;,&#34;Attribution-NonCommercial License&#34;</code></pre></div></div><p>So far, we have only imported standard TFX components and chained them together into a pipeline. Both the Transform and the Trainer components have a <code>module_file</code> argument defined. That’s where we define the behavior we want from these standard components.</p><h4 id=preprocess>Preprocess</h4><p>The Transform component searches the <code>module_file</code> for a definition of the function <code>preprocessing_fn</code>. This function is the central concept of the <code>tf.transform</code> library. The <a href=https://www.tensorflow.org/tfx/transform/get_started#define_a_preprocessing_function>TFX documentation</a> describes this function:</p><blockquote><p>The preprocessing function is the most important concept of tf.Transform. The preprocessing function is a logical description of a transformation of the dataset. The preprocessing function accepts and returns a dictionary of tensors, where a tensor means Tensor or SparseTensor. There are two kinds of functions used to define the preprocessing function:</p><ol><li>Any function that accepts and returns tensors. These add TensorFlow operations to the graph that transform raw data into transformed data.</li><li>Any of the analyzers provided by tf.Transform. Analyzers also accept and return tensors, but unlike TensorFlow functions, they do not add operations to the graph. Instead, analyzers cause tf.Transform to compute a full-pass operation outside of TensorFlow. They use the input tensor values over the entire dataset to generate a constant tensor that is returned as the output. For example, tft.min computes the minimum of a tensor over the dataset. tf.Transform provides a fixed set of analyzers, but this will be extended in future versions.</li></ol></blockquote><p>Therefore, <code>preprocesing_fn</code> can contain all tf operations that accept and return tensors, as well as specific <code>tf.transform</code> operations. In the following example, we use the former to convert all incoming captions to lowercase letters only, while the latter does a full pass on all the data in our dataset to compute the average length of the captions used for a follow-up preprocessing step.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def preprocessing_fn(inputs):
&#34;&#34;&#34;Transform raw data.&#34;&#34;&#34;
# convert the captions to lowercase
# split the captions into separate words
lower = tf.strings.lower(inputs[&#39;caption&#39;])
# compute the vocabulary of the captions during a full pass
# over the dataset and use this to tokenize.
mean_length = tft.mean(tf.strings.length(lower))
# &lt;do some preprocessing with the mean length&gt;
return {
&#39;caption_lower&#39;: lower,
}</code></pre></div></div><p>This function only defines the logical steps that must be performed during preprocessing. The function needs a concrete implementation before it can run. One such implementation is provided by <code>tf.Transform</code> using Apache Beam, which provides a PTransform <code>tft_beam.AnalyzeAndTransformDataset</code> to process the data. We can test this preproccesing_fn outside of the TFX Transform component using this PTransform explicitly. Calling the <code>processing_fn</code> in such a way is not necessary when using <code>tf.Transform</code> in combination with the TFX Transform component.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>if __name__ == &#34;__main__&#34;:
# Test processing_fn directly without the tfx pipeline
raw_data = [
{
&#34;caption&#34;: &#34;A bicycle replica with a clock as the front wheel.&#34;
}, {
&#34;caption&#34;: &#34;A black Honda motorcycle parked in front of a garage.&#34;
}, {
&#34;caption&#34;: &#34;A room with blue walls and a white sink and door.&#34;
}
]
# define the feature_spec (in a tfx pipeline this would be generated by a SchemaGen component)
feature_spec = dict(caption=tf.io.FixedLenFeature([], tf.string))
raw_data_metadata = tft.DatasetMetadata.from_feature_spec(feature_spec)
# test out the beam implementation of the
# processing_fn with AnalyzeAndTransformDataset
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata)
| tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset</code></pre></div></div><h4 id=train>Train</h4><p>The Trainer component behaves like the Transform component, but instead of looking for a <code>preprocessing_fn</code>, it requires a <code>run_fn</code> function in the specified <code>module_file</code>. Our simple implementation creates a stub model using <code>tf.Keras</code> and saves the resulting model to a directory.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def run_fn(fn_args: tfx.components.FnArgs) -&gt; None:
&#34;&#34;&#34;Build the TF model, train it and export it.&#34;&#34;&#34;
# create a model
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(1, input_dim=10))
model.compile()
# train the model on the preprocessed data
# model.fit(...)
# Save model to fn_args.serving_model_dir.
model.save(fn_args.serving_model_dir)</code></pre></div></div><h4 id=executing-the-pipeline>Executing the pipeline</h4><p>To launch the pipeline, provide two configurations: The orchestrator for the TFX pipeline and the pipeline options to run Apache Beam pipelines. To run the pipeline locally without extra setup dependencies, this example uses the <code>LocalDagRunner</code> for orchestration. The pipeline created can specify Apache Beam’s pipeline options through the <code>beam_pipeline_args</code> argument.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>args = parse_args()
tfx.orchestration.LocalDagRunner().run(create_pipeline(**vars(args)))</code></pre></div></div><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>