blob: c29030f857bffd56389f9a9e4cd69b6fc79c7b8b [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Design Your Pipeline</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/pipelines/design-your-pipeline/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/pipelines/design-your-pipeline.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/pipelines/design-your-pipeline.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#what-to-consider-when-designing-your-pipeline>What to consider when designing your pipeline</a></li><li><a href=#a-basic-pipeline>A basic pipeline</a></li><li><a href=#branching-pcollections>Branching PCollections</a><ul><li><a href=#multiple-transforms-process-the-same-pcollection>Multiple transforms process the same PCollection</a></li><li><a href=#a-single-transform-that-produces-multiple-outputs>A single transform that produces multiple outputs</a></li></ul></li><li><a href=#merging-pcollections>Merging PCollections</a></li><li><a href=#multiple-sources>Multiple sources</a></li><li><a href=#whats-next>What&rsquo;s next</a></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=design-your-pipeline>Design Your Pipeline</h1><nav id=TableOfContents><ul><li><a href=#what-to-consider-when-designing-your-pipeline>What to consider when designing your pipeline</a></li><li><a href=#a-basic-pipeline>A basic pipeline</a></li><li><a href=#branching-pcollections>Branching PCollections</a><ul><li><a href=#multiple-transforms-process-the-same-pcollection>Multiple transforms process the same PCollection</a></li><li><a href=#a-single-transform-that-produces-multiple-outputs>A single transform that produces multiple outputs</a></li></ul></li><li><a href=#merging-pcollections>Merging PCollections</a></li><li><a href=#multiple-sources>Multiple sources</a></li><li><a href=#whats-next>What&rsquo;s next</a></li></ul></nav><p>This page helps you design your Apache Beam pipeline. It includes information about how to determine your pipeline&rsquo;s structure, how to choose which transforms to apply to your data, and how to determine your input and output methods.</p><p>Before reading this section, it is recommended that you become familiar with the information in the <a href=/documentation/programming-guide>Beam programming guide</a>.</p><h2 id=what-to-consider-when-designing-your-pipeline>What to consider when designing your pipeline</h2><p>When designing your Beam pipeline, consider a few basic questions:</p><ul><li><strong>Where is your input data stored?</strong> How many sets of input data do you have? This will determine what kinds of <code>Read</code> transforms you&rsquo;ll need to apply at the start of your pipeline.</li><li><strong>What does your data look like?</strong> It might be plaintext, formatted log files, or rows in a database table. Some Beam transforms work exclusively on <code>PCollection</code>s of key/value pairs; you&rsquo;ll need to determine if and how your data is keyed and how to best represent that in your pipeline&rsquo;s <code>PCollection</code>(s).</li><li><strong>What do you want to do with your data?</strong> The core transforms in the Beam SDKs are general purpose. Knowing how you need to change or manipulate your data will determine how you build core transforms like <a href=/documentation/programming-guide/#pardo>ParDo</a>, or when you use pre-written transforms included with the Beam SDKs.</li><li><strong>What does your output data look like, and where should it go?</strong> This will determine what kinds of <code>Write</code> transforms you&rsquo;ll need to apply at the end of your pipeline.</li></ul><h2 id=a-basic-pipeline>A basic pipeline</h2><p>The simplest pipelines represent a linear flow of operations, as shown in figure 1.</p><p><img src=/images/design-your-pipeline-linear.svg alt="A linear pipeline starts with one input collection, sequentially appliesthree transforms, and ends with one output collection."></p><p><em>Figure 1: A linear pipeline.</em></p><p>However, your pipeline can be significantly more complex. A pipeline represents a <a href=https://en.wikipedia.org/wiki/Directed_acyclic_graph>Directed Acyclic Graph</a> of steps. It can have multiple input sources, multiple output sinks, and its operations (<code>PTransform</code>s) can both read and output multiple <code>PCollection</code>s. The following examples show some of the different shapes your pipeline can take.</p><h2 id=branching-pcollections>Branching PCollections</h2><p>It&rsquo;s important to understand that transforms do not consume <code>PCollection</code>s; instead, they consider each individual element of a <code>PCollection</code> and create a new <code>PCollection</code> as output. This way, you can do different things to different elements in the same <code>PCollection</code>.</p><h3 id=multiple-transforms-process-the-same-pcollection>Multiple transforms process the same PCollection</h3><p>You can use the same <code>PCollection</code> as input for multiple transforms without consuming the input or altering it.</p><p>The pipeline in figure 2 is a branching pipeline. The pipeline reads its input (first names represented as strings) from a database table and creates a <code>PCollection</code> of table rows. Then, the pipeline applies multiple transforms to the <strong>same</strong> <code>PCollection</code>. Transform A extracts all the names in that <code>PCollection</code> that start with the letter &lsquo;A&rsquo;, and Transform B extracts all the names in that <code>PCollection</code> that start with the letter &lsquo;B&rsquo;. Both transforms A and B have the same input <code>PCollection</code>.</p><p><img src=/images/design-your-pipeline-multiple-pcollections.svg alt="The pipeline applies two transforms to a single input collection. Eachtransform produces an output collection."></p><p><em>Figure 2: A branching pipeline. Two transforms are applied to a single
PCollection of database table rows.</em></p><p>The following example code applies two transforms to a single input collection.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>dbRowCollection</span> <span class=o>=</span> <span class=o>...;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>aCollection</span> <span class=o>=</span> <span class=n>dbRowCollection</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;aTrans&#34;</span><span class=o>,</span> <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;(){</span>
</span></span><span class=line><span class=cl> <span class=nd>@ProcessElement</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>if</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>startsWith</span><span class=o>(</span><span class=s>&#34;A&#34;</span><span class=o>)){</span>
</span></span><span class=line><span class=cl> <span class=n>c</span><span class=o>.</span><span class=na>output</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>());</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>bCollection</span> <span class=o>=</span> <span class=n>dbRowCollection</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;bTrans&#34;</span><span class=o>,</span> <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;(){</span>
</span></span><span class=line><span class=cl> <span class=nd>@ProcessElement</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>if</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>startsWith</span><span class=o>(</span><span class=s>&#34;B&#34;</span><span class=o>)){</span>
</span></span><span class=line><span class=cl> <span class=n>c</span><span class=o>.</span><span class=na>output</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>());</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}));</span></span></span></code></pre></div></div></div><h3 id=a-single-transform-that-produces-multiple-outputs>A single transform that produces multiple outputs</h3><p>Another way to branch a pipeline is to have a <strong>single</strong> transform output to multiple <code>PCollection</code>s by using <a href=/documentation/programming-guide/#additional-outputs>tagged outputs</a>. Transforms that produce more than one output process each element of the input once, and output to zero or more <code>PCollection</code>s.</p><p>Figure 3 illustrates the same example described above, but with one transform that produces multiple outputs. Names that start with &lsquo;A&rsquo; are added to the main output <code>PCollection</code>, and names that start with &lsquo;B&rsquo; are added to an additional output <code>PCollection</code>.</p><p><img src=/images/design-your-pipeline-additional-outputs.svg alt="The pipeline applies one transform that produces multiple output collections."></p><p><em>Figure 3: A pipeline with a transform that outputs multiple PCollections.</em></p><p>If we compare the pipelines in figure 2 and figure 3, you can see they perform
the same operation in different ways. The pipeline in figure 2 contains two
transforms that process the elements in the same input <code>PCollection</code>. One
transform uses the following logic:</p><pre>if (starts with 'A') { outputToPCollectionA }</pre><p>while the other transform uses:</p><pre>if (starts with 'B') { outputToPCollectionB }</pre><p>Because each transform reads the entire input <code>PCollection</code>, each element in the input <code>PCollection</code> is processed twice.</p><p>The pipeline in figure 3 performs the same operation in a different way - with only one transform that uses the following logic:</p><pre>if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }</pre><p>where each element in the input <code>PCollection</code> is processed once.</p><p>The following example code applies one transform that processes each element
once and outputs two collections.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Define two TupleTags, one for each output.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=kd>final</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>startsWithATag</span> <span class=o>=</span> <span class=k>new</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;(){};</span>
</span></span><span class=line><span class=cl><span class=kd>final</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>startsWithBTag</span> <span class=o>=</span> <span class=k>new</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;(){};</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>PCollectionTuple</span> <span class=n>mixedCollection</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>dbRowCollection</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=nd>@ProcessElement</span>
</span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=k>if</span> <span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>startsWith</span><span class=o>(</span><span class=s>&#34;A&#34;</span><span class=o>))</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=c1>// Emit to main output, which is the output with tag startsWithATag.
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>c</span><span class=o>.</span><span class=na>output</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>());</span>
</span></span><span class=line><span class=cl> <span class=o>}</span> <span class=k>else</span> <span class=k>if</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>startsWith</span><span class=o>(</span><span class=s>&#34;B&#34;</span><span class=o>))</span> <span class=o>{</span>
</span></span><span class=line><span class=cl> <span class=c1>// Emit to output with tag startsWithBTag.
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>c</span><span class=o>.</span><span class=na>output</span><span class=o>(</span><span class=n>startsWithBTag</span><span class=o>,</span> <span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>());</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl> <span class=o>}</span>
</span></span><span class=line><span class=cl> <span class=o>})</span>
</span></span><span class=line><span class=cl> <span class=c1>// Specify main output. In this example, it is the output
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// with tag startsWithATag.
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=o>.</span><span class=na>withOutputTags</span><span class=o>(</span><span class=n>startsWithATag</span><span class=o>,</span>
</span></span><span class=line><span class=cl> <span class=c1>// Specify the output with tag startsWithBTag, as a TupleTagList.
</span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>TupleTagList</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>startsWithBTag</span><span class=o>)));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// Get subset of the output with tag startsWithATag.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>mixedCollection</span><span class=o>.</span><span class=na>get</span><span class=o>(</span><span class=n>startsWithATag</span><span class=o>).</span><span class=na>apply</span><span class=o>(...);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// Get subset of the output with tag startsWithBTag.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>mixedCollection</span><span class=o>.</span><span class=na>get</span><span class=o>(</span><span class=n>startsWithBTag</span><span class=o>).</span><span class=na>apply</span><span class=o>(...);</span></span></span></code></pre></div></div></div><p>You can use either mechanism to produce multiple output <code>PCollection</code>s. The first option is recommended if it logically does not make sense to combine the processing logic into one <code>ParDo</code>. However, using the second option (a single transform that produces multiple outputs) makes more sense if the transform&rsquo;s computation per element is time-consuming, and is more scalable if you plan to add more output types in the future.</p><h2 id=merging-pcollections>Merging PCollections</h2><p>Often, after you&rsquo;ve branched your <code>PCollection</code> into multiple <code>PCollection</code>s via multiple transforms, you&rsquo;ll want to merge some or all of those resulting <code>PCollection</code>s back together. You can do so by using one of the following:</p><ul><li><strong>Flatten</strong> - You can use the <code>Flatten</code> transform in the Beam SDKs to merge multiple <code>PCollection</code>s of the <strong>same type</strong>.</li><li><strong>Join</strong> - You can use the <code>CoGroupByKey</code> transform in the Beam SDK to perform a relational join between two <code>PCollection</code>s. The <code>PCollection</code>s must be keyed (i.e. they must be collections of key/value pairs) and they must use the same key type.</li></ul><p>The example in figure 4 is a continuation of the example in figure 2 in <a href=#multiple-transforms-process-the-same-pcollection>the
section above</a>. After
branching into two <code>PCollection</code>s, one with names that begin with &lsquo;A&rsquo; and one
with names that begin with &lsquo;B&rsquo;, the pipeline merges the two together into a
single <code>PCollection</code> that now contains all names that begin with either &lsquo;A&rsquo; or
&lsquo;B&rsquo;. Here, it makes sense to use <code>Flatten</code> because the <code>PCollection</code>s being
merged both contain the same type.</p><p><img src=/images/design-your-pipeline-flatten.svg alt="The pipeline merges two collections into one collection with the Flatten transform."></p><p><em>Figure 4: A pipeline that merges two collections into one collection with the Flatten transform.</em></p><p>The following example code applies <code>Flatten</code> to merge two collections.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>//merge the two PCollections with Flatten
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>PCollectionList</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>collectionList</span> <span class=o>=</span> <span class=n>PCollectionList</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>aCollection</span><span class=o>).</span><span class=na>and</span><span class=o>(</span><span class=n>bCollection</span><span class=o>);</span>
</span></span><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>mergedCollectionWithFlatten</span> <span class=o>=</span> <span class=n>collectionList</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Flatten</span><span class=o>.&lt;</span><span class=n>String</span><span class=o>&gt;</span><span class=n>pCollections</span><span class=o>());</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// continue with the new merged PCollection
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>mergedCollectionWithFlatten</span><span class=o>.</span><span class=na>apply</span><span class=o>(...);</span></span></span></code></pre></div></div></div><h2 id=multiple-sources>Multiple sources</h2><p>Your pipeline can read its input from one or more sources. If your pipeline reads from multiple sources and the data from those sources is related, it can be useful to join the inputs together. In the example illustrated in figure 5 below, the pipeline reads names and addresses from a database table, and names and order numbers from a Kafka topic. The pipeline then uses <code>CoGroupByKey</code> to join this information, where the key is the name; the resulting <code>PCollection</code> contains all the combinations of names, addresses, and orders.</p><p><img src=/images/design-your-pipeline-join.svg alt="The pipeline joins two input collections into one collection with the Join transform."></p><p><em>Figure 5: A pipeline that does a relational join of two input collections.</em></p><p>The following example code applies <code>Join</code> to join two input collections.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;&gt;</span> <span class=n>userAddress</span> <span class=o>=</span> <span class=n>pipeline</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>JdbcIO</span><span class=o>.&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;&gt;</span><span class=n>read</span><span class=o>()...);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;&gt;</span> <span class=n>userOrder</span> <span class=o>=</span> <span class=n>pipeline</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>KafkaIO</span><span class=o>.&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>&gt;</span><span class=n>read</span><span class=o>()...);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=kd>final</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>addressTag</span> <span class=o>=</span> <span class=k>new</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;();</span>
</span></span><span class=line><span class=cl><span class=kd>final</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;</span> <span class=n>orderTag</span> <span class=o>=</span> <span class=k>new</span> <span class=n>TupleTag</span><span class=o>&lt;</span><span class=n>String</span><span class=o>&gt;();</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1>// Merge collection values into a CoGbkResult collection.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>CoGbkResult</span><span class=o>&gt;&gt;</span> <span class=n>joinedCollection</span> <span class=o>=</span>
</span></span><span class=line><span class=cl> <span class=n>KeyedPCollectionTuple</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>addressTag</span><span class=o>,</span> <span class=n>userAddress</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>and</span><span class=o>(</span><span class=n>orderTag</span><span class=o>,</span> <span class=n>userOrder</span><span class=o>)</span>
</span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>CoGroupByKey</span><span class=o>.&lt;</span><span class=n>String</span><span class=o>&gt;</span><span class=n>create</span><span class=o>());</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=n>joinedCollection</span><span class=o>.</span><span class=na>apply</span><span class=o>(...);</span></span></span></code></pre></div></div></div><h2 id=whats-next>What&rsquo;s next</h2><ul><li><a href=/documentation/pipelines/create-your-pipeline>Create your own pipeline</a>.</li><li><a href=/documentation/pipelines/test-your-pipeline>Test your pipeline</a>.</li></ul><div class=feedback><p class=update>Last updated on 2024/05/08</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>