blob: ca042fa103d3927a7ed1b6f871e11a242cd22f05 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Basics of the Beam model</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/basics/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/basics.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/basics.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#pipeline>Pipeline</a></li><li><a href=#pcollection>PCollection</a></li><li><a href=#ptransform>PTransform</a></li><li><a href=#aggregation>Aggregation</a></li><li><a href=#user-defined-function-udf>User-defined function (UDF)</a></li><li><a href=#schema>Schema</a></li><li><a href=#runner>Runner</a></li><li><a href=#window>Window</a></li><li><a href=#watermark>Watermark</a></li><li><a href=#trigger>Trigger</a></li><li><a href=#state-and-timers>State and timers</a></li><li><a href=#splittable-dofn>Splittable DoFn</a></li><li><a href=#whats-next>What&rsquo;s next</a></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=basics-of-the-beam-model>Basics of the Beam model</h1><p>Apache Beam is a unified model for defining both batch and streaming
data-parallel processing pipelines. To get started with Beam, you&rsquo;ll need to
understand an important set of core concepts:</p><ul><li><a href=#pipeline><em>Pipeline</em></a> - A pipeline is a user-constructed graph of
transformations that defines the desired data processing operations.</li><li><a href=#pcollection><em>PCollection</em></a> - A <code>PCollection</code> is a data set or data
stream. The data that a pipeline processes is part of a PCollection.</li><li><a href=#ptransform><em>PTransform</em></a> - A <code>PTransform</code> (or <em>transform</em>) represents a
data processing operation, or a step, in your pipeline. A transform is
applied to zero or more <code>PCollection</code> objects, and produces zero or more
<code>PCollection</code> objects.</li><li><a href=#aggregation><em>Aggregation</em></a> - Aggregation is computing a value from
multiple (1 or more) input elements.</li><li><a href=#user-defined-function-udf><em>User-defined function (UDF)</em></a> - Some Beam
operations allow you to run user-defined code as a way to configure the
transform.</li><li><a href=#schema><em>Schema</em></a> - A schema is a language-independent type definition for
a <code>PCollection</code>. The schema for a <code>PCollection</code> defines elements of that
<code>PCollection</code> as an ordered list of named fields.</li><li><a href=/documentation/sdks/java/><em>SDK</em></a> - A language-specific library that lets
pipeline authors build transforms, construct their pipelines, and submit
them to a runner.</li><li><a href=#runner><em>Runner</em></a> - A runner runs a Beam pipeline using the capabilities of
your chosen data processing engine.</li><li><a href=#window><em>Window</em></a> - A <code>PCollection</code> can be subdivided into windows based on
the timestamps of the individual elements. Windows enable grouping operations
over collections that grow over time by dividing the collection into windows
of finite collections.</li><li><a href=#watermark><em>Watermark</em></a> - A watermark is a guess as to when all data in a
certain window is expected to have arrived. This is needed because data isn’t
always guaranteed to arrive in a pipeline in time order, or to always arrive
at predictable intervals.</li><li><a href=#trigger><em>Trigger</em></a> - A trigger determines when to aggregate the results of
each window.</li><li><a href=#state-and-timers><em>State and timers</em></a> - Per-key state and timer callbacks
are lower level primitives that give you full control over aggregating input
collections that grow over time.</li><li><a href=#splittable-dofn><em>Splittable DoFn</em></a> - Splittable DoFns let you process
elements in a non-monolithic way. You can checkpoint the processing of an
element, and the runner can split the remaining work to yield additional
parallelism.</li></ul><p>The following sections cover these concepts in more detail and provide links to
additional documentation.</p><h2 id=pipeline>Pipeline</h2><p>A Beam pipeline is a graph (specifically, a
<a href=https://en.wikipedia.org/wiki/Directed_acyclic_graph>directed acyclic graph</a>)
of all the data and computations in your data processing task. This includes
reading input data, transforming that data, and writing output data. A pipeline
is constructed by a user in their SDK of choice. Then, the pipeline makes its
way to the runner either through the SDK directly or through the Runner API&rsquo;s
RPC interface. For example, this diagram shows a branching pipeline:</p><p><img src=/images/design-your-pipeline-multiple-pcollections.svg alt="The pipeline applies two transforms to a single input collection. Eachtransform produces an output collection."></p><p>In this diagram, the boxes represent the parallel computations called
<a href=#ptransform><em>PTransforms</em></a> and the arrows with the circles represent the data
(in the form of <a href=#pcollection><em>PCollections</em></a>) that flows between the
transforms. The data might be bounded, stored, data sets, or the data might also
be unbounded streams of data. In Beam, most transforms apply equally to bounded
and unbounded data.</p><p>You can express almost any computation that you can think of as a graph as a
Beam pipeline. A Beam driver program typically starts by creating a <code>Pipeline</code>
object, and then uses that object as the basis for creating the pipeline’s data
sets and its transforms.</p><p>For more information about pipelines, see the following pages:</p><ul><li><a href=/documentation/programming-guide/#overview>Beam Programming Guide: Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Beam Programming Guide: Creating a pipeline</a></li><li><a href=/documentation/pipelines/design-your-pipeline>Design your pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline>Create your pipeline</a></li></ul><h2 id=pcollection>PCollection</h2><p>A <code>PCollection</code> is an unordered bag of elements. Each <code>PCollection</code> is a
potentially distributed, homogeneous data set or data stream, and is owned by
the specific <code>Pipeline</code> object for which it is created. Multiple pipelines
cannot share a <code>PCollection</code>. Beam pipelines process PCollections, and the
runner is responsible for storing these elements.</p><p>A <code>PCollection</code> generally contains &ldquo;big data&rdquo; (too much data to fit in memory on
a single machine). Sometimes a small sample of data or an intermediate result
might fit into memory on a single machine, but Beam&rsquo;s computational patterns and
transforms are focused on situations where distributed data-parallel computation
is required. Therefore, the elements of a <code>PCollection</code> cannot be processed
individually, and are instead processed uniformly in parallel.</p><p>The following characteristics of a <code>PCollection</code> are important to know.</p><p><strong>Bounded vs. unbounded</strong>:</p><p>A <code>PCollection</code> can be either bounded or unbounded.</p><ul><li>A <em>bounded</em> <code>PCollection</code> is a dataset of a known, fixed size (alternatively,
a dataset that is not growing over time). Bounded data can be processed by
batch pipelines.</li><li>An <em>unbounded</em> <code>PCollection</code> is a dataset that grows over time, and the
elements are processed as they arrive. Unbounded data must be processed by
streaming pipelines.</li></ul><p>These two categories derive from the intuitions of batch and stream processing,
but the two are unified in Beam and bounded and unbounded PCollections can
coexist in the same pipeline. If your runner can only support bounded
PCollections, you must reject pipelines that contain unbounded PCollections. If
your runner is only targeting streams, there are adapters in Beam&rsquo;s support code
to convert everything to APIs that target unbounded data.</p><p><strong>Timestamps</strong>:</p><p>Every element in a <code>PCollection</code> has a timestamp associated with it.</p><p>When you execute a primitive connector to a storage system, that connector is
responsible for providing initial timestamps. The runner must propagate and
aggregate timestamps. If the timestamp is not important, such as with certain
batch processing jobs where elements do not denote events, the timestamp will be
the minimum representable timestamp, often referred to colloquially as &ldquo;negative
infinity&rdquo;.</p><p><strong>Watermarks</strong>:</p><p>Every <code>PCollection</code> must have a <a href=#watermark>watermark</a> that estimates how
complete the <code>PCollection</code> is.</p><p>The watermark is a guess that &ldquo;we&rsquo;ll never see an element with an earlier
timestamp&rdquo;. Data sources are responsible for producing a watermark. The runner
must implement watermark propagation as PCollections are processed, merged, and
partitioned.</p><p>The contents of a <code>PCollection</code> are complete when a watermark advances to
&ldquo;infinity&rdquo;. In this manner, you can discover that an unbounded PCollection is
finite.</p><p><strong>Windowed elements</strong>:</p><p>Every element in a <code>PCollection</code> resides in a <a href=#window>window</a>. No element
resides in multiple windows; two elements can be equal except for their window,
but they are not the same.</p><p>When elements are written to the outside world, they are effectively placed back
into the global window. Transforms that write data and don&rsquo;t take this
perspective risk data loss.</p><p>A window has a maximum timestamp. When the watermark exceeds the maximum
timestamp plus the user-specified allowed lateness, the window is expired. All
data related to an expired window might be discarded at any time.</p><p><strong>Coder</strong>:</p><p>Every <code>PCollection</code> has a coder, which is a specification of the binary format
of the elements.</p><p>In Beam, the user&rsquo;s pipeline can be written in a language other than the
language of the runner. There is no expectation that the runner can actually
deserialize user data. The Beam model operates principally on encoded data,
&ldquo;just bytes&rdquo;. Each <code>PCollection</code> has a declared encoding for its elements,
called a coder. A coder has a URN that identifies the encoding, and might have
additional sub-coders. For example, a coder for lists might contain a coder for
the elements of the list. Language-specific serialization techniques are
frequently used, but there are a few common key formats (such as key-value pairs
and timestamps) so the runner can understand them.</p><p><strong>Windowing strategy</strong>:</p><p>Every <code>PCollection</code> has a windowing strategy, which is a specification of
essential information for grouping and triggering operations. The <code>Window</code>
transform sets up the windowing strategy, and the <code>GroupByKey</code> transform has
behavior that is governed by the windowing strategy.</p><br><p>For more information about PCollections, see the following page:</p><ul><li><a href=/documentation/programming-guide/#pcollections>Beam Programming Guide: PCollections</a></li></ul><h2 id=ptransform>PTransform</h2><p>A <code>PTransform</code> (or transform) represents a data processing operation, or a step,
in your pipeline. A transform is usually applied to one or more input
<code>PCollection</code> objects. Transforms that read input are an exception; these
transforms might not have an input <code>PCollection</code>.</p><p>You provide transform processing logic in the form of a function object
(colloquially referred to as “user code”), and your user code is applied to each
element of the input PCollection (or more than one PCollection). Depending on
the pipeline runner and backend that you choose, many different workers across a
cluster might execute instances of your user code in parallel. The user code
that runs on each worker generates the output elements that are added to zero or
more output <code>PCollection</code> objects.</p><p>The Beam SDKs contain a number of different transforms that you can apply to
your pipeline’s PCollections. These include general-purpose core transforms,
such as <code>ParDo</code> or <code>Combine</code>. There are also pre-written composite transforms
included in the SDKs, which combine one or more of the core transforms in a
useful processing pattern, such as counting or combining elements in a
collection. You can also define your own more complex composite transforms to
fit your pipeline’s exact use case.</p><p>The following list has some common transform types:</p><ul><li>Source transforms such as <code>TextIO.Read</code> and <code>Create</code>. A source transform
conceptually has no input.</li><li>Processing and conversion operations such as <code>ParDo</code>, <code>GroupByKey</code>,
<code>CoGroupByKey</code>, <code>Combine</code>, and <code>Count</code>.</li><li>Outputting transforms such as <code>TextIO.Write</code>.</li><li>User-defined, application-specific composite transforms.</li></ul><p>For more information about transforms, see the following pages:</p><ul><li><a href=/documentation/programming-guide/#overview>Beam Programming Guide: Overview</a></li><li><a href=/documentation/programming-guide/#transforms>Beam Programming Guide: Transforms</a></li><li>Beam transform catalog (<a href=/documentation/transforms/java/overview/>Java</a>,
<a href=/documentation/transforms/python/overview/>Python</a>)</li></ul><h2 id=aggregation>Aggregation</h2><p>Aggregation is computing a value from multiple (1 or more) input elements. In
Beam, the primary computational pattern for aggregation is to group all elements
with a common key and window then combine each group of elements using an
associative and commutative operation. This is similar to the &ldquo;Reduce&rdquo; operation
in the <a href=https://en.wikipedia.org/wiki/MapReduce>MapReduce</a> model, though it is
enhanced to work with unbounded input streams as well as bounded data sets.</p><img src=/images/aggregation.png alt="Aggregation of elements." width=120px><p><em>Figure 1: Aggregation of elements. Elements with the same color represent those
with a common key and window.</em></p><p>Some simple aggregation transforms include <code>Count</code> (computes the count of all
elements in the aggregation), <code>Max</code> (computes the maximum element in the
aggregation), and <code>Sum</code> (computes the sum of all elements in the aggregation).</p><p>When elements are grouped and emitted as a bag, the aggregation is known as
<code>GroupByKey</code> (the associative/commutative operation is bag union). In this case,
the output is no smaller than the input. Often, you will apply an operation such
as summation, called a <code>CombineFn</code>, in which the output is significantly smaller
than the input. In this case the aggregation is called <code>CombinePerKey</code>.</p><p>In a real application, you might have millions of keys and/or windows; that is
why this is still an &ldquo;embarrassingly parallel&rdquo; computational pattern. In those
cases where you have fewer keys, you can add parallelism by adding a
supplementary key, splitting each of your problem&rsquo;s natural keys into many
sub-keys. After these sub-keys are aggregated, the results can be further
combined into a result for the original natural key for your problem. The
associativity of your aggregation function ensures that this yields the same
answer, but with more parallelism.</p><p>When your input is unbounded, the computational pattern of grouping elements by
key and window is roughly the same, but governing when and how to emit the
results of aggregation involves three concepts:</p><ul><li><a href=#window>Windowing</a>, which partitions your input into bounded subsets that
can be complete.</li><li><a href=#watermark>Watermarks</a>, which estimate the completeness of your input.</li><li><a href=#trigger>Triggers</a>, which govern when and how to emit aggregated results.</li></ul><p>For more information about available aggregation transforms, see the following
pages:</p><ul><li><a href=/documentation/programming-guide/#core-beam-transforms>Beam Programming Guide: Core Beam transforms</a></li><li>Beam Transform catalog
(<a href=/documentation/transforms/java/overview/#aggregation>Java</a>,
<a href=/documentation/transforms/python/overview/#aggregation>Python</a>)</li></ul><h2 id=user-defined-function-udf>User-defined function (UDF)</h2><p>Some Beam operations allow you to run user-defined code as a way to configure
the transform. For example, when using <code>ParDo</code>, user-defined code specifies what
operation to apply to every element. For <code>Combine</code>, it specifies how values
should be combined. By using <a href=/documentation/patterns/cross-language/>cross-language transforms</a>,
a Beam pipeline can contain UDFs written in a different language, or even
multiple languages in the same pipeline.</p><p>Beam has several varieties of UDFs:</p><ul><li><a href=/documentation/programming-guide/#pardo><em>DoFn</em></a> - per-element processing
function (used in <code>ParDo</code>)</li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function><em>WindowFn</em></a> -
places elements in windows and merges windows (used in <code>Window</code> and
<code>GroupByKey</code>)</li><li><a href=/documentation/programming-guide/#side-inputs><em>ViewFn</em></a> - adapts a
materialized <code>PCollection</code> to a particular interface (used in side inputs)</li><li><a href=/documentation/programming-guide/#side-inputs-windowing><em>WindowMappingFn</em></a> -
maps one element&rsquo;s window to another, and specifies bounds on how far in the
past the result window will be (used in side inputs)</li><li><a href=/documentation/programming-guide/#combine><em>CombineFn</em></a> - associative and
commutative aggregation (used in <code>Combine</code> and state)</li><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety><em>Coder</em></a> -
encodes user data; some coders have standard formats and are not really UDFs</li></ul><p>Each language SDK has its own idiomatic way of expressing the user-defined
functions in Beam, but there are common requirements. When you build user code
for a Beam transform, you should keep in mind the distributed nature of
execution. For example, there might be many copies of your function running on a
lot of different machines in parallel, and those copies function independently,
without communicating or sharing state with any of the other copies. Each copy
of your user code function might be retried or run multiple times, depending on
the pipeline runner and the processing backend that you choose for your
pipeline. Beam also supports stateful processing through the
<a href=/blog/stateful-processing/>stateful processing API</a>.</p><p>For more information about user-defined functions, see the following pages:</p><ul><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for writing user code for Beam transforms</a></li><li><a href=/documentation/programming-guide/#pardo>Beam Programming Guide: ParDo</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Beam Programming Guide: WindowFn</a></li><li><a href=/documentation/programming-guide/#combine>Beam Programming Guide: CombineFn</a></li><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Beam Programming Guide: Coder</a></li><li><a href=/documentation/programming-guide/#side-inputs>Beam Programming Guide: Side inputs</a></li></ul><h2 id=schema>Schema</h2><p>A schema is a language-independent type definition for a <code>PCollection</code>. The
schema for a <code>PCollection</code> defines elements of that <code>PCollection</code> as an ordered
list of named fields. Each field has a name, a type, and possibly a set of user
options.</p><p>In many cases, the element type in a <code>PCollection</code> has a structure that can be
introspected. Some examples are JSON, Protocol Buffer, Avro, and database row
objects. All of these formats can be converted to Beam Schemas. Even within a
SDK pipeline, Simple Java POJOs (or equivalent structures in other languages)
are often used as intermediate types, and these also have a clear structure that
can be inferred by inspecting the class. By understanding the structure of a
pipeline’s records, we can provide much more concise APIs for data processing.</p><p>Beam provides a collection of transforms that operate natively on schemas. For
example, <a href=/documentation/dsls/sql/overview/>Beam SQL</a> is a common transform
that operates on schemas. These transforms allow selections and aggregations in
terms of named schema fields. Another advantage of schemas is that they allow
referencing of element fields by name. Beam provides a selection syntax for
referencing fields, including nested and repeated fields.</p><p>For more information about schemas, see the following pages:</p><ul><li><a href=/documentation/programming-guide/#schemas>Beam Programming Guide: Schemas</a></li><li><a href=/documentation/patterns/schema/>Schema Patterns</a></li></ul><h2 id=runner>Runner</h2><p>A Beam runner runs a Beam pipeline on a specific platform. Most runners are
translators or adapters to massively parallel big data processing systems, such
as Apache Flink, Apache Spark, Google Cloud Dataflow, and more. For example, the
Flink runner translates a Beam pipeline into a Flink job. The Direct Runner runs
pipelines locally so you can test, debug, and validate that your pipeline
adheres to the Apache Beam model as closely as possible.</p><p>For an up-to-date list of Beam runners and which features of the Apache Beam
model they support, see the runner
<a href=/documentation/runners/capability-matrix/>capability matrix</a>.</p><p>For more information about runners, see the following pages:</p><ul><li><a href=/documentation/#choosing-a-runner>Choosing a Runner</a></li><li><a href=/documentation/runners/capability-matrix/>Beam Capability Matrix</a></li></ul><h2 id=window>Window</h2><p>Windowing subdivides a <code>PCollection</code> into <em>windows</em> according to the timestamps
of its individual elements. Windows enable grouping operations over unbounded
collections by dividing the collection into windows of finite collections.</p><p>A <em>windowing function</em> tells the runner how to assign elements to one or more
initial windows, and how to merge windows of grouped elements. Each element in a
<code>PCollection</code> can only be in one window, so if a windowing function specifies
multiple windows for an element, the element is conceptually duplicated into
each of the windows and each element is identical except for its window.</p><p>Transforms that aggregate multiple elements, such as <code>GroupByKey</code> and <code>Combine</code>,
work implicitly on a per-window basis; they process each <code>PCollection</code> as a
succession of multiple, finite windows, though the entire collection itself may
be of unbounded size.</p><p>Beam provides several windowing functions:</p><ul><li><strong>Fixed time windows</strong> (also known as &ldquo;tumbling windows&rdquo;) represent a consistent
duration, non-overlapping time interval in the data stream.</li><li><strong>Sliding time windows</strong> (also known as &ldquo;hopping windows&rdquo;) also represent time
intervals in the data stream; however, sliding time windows can overlap.</li><li><strong>Per-session windows</strong> define windows that contain elements that are within a
certain gap duration of another element.</li><li><strong>Single global window</strong>: by default, all data in a <code>PCollection</code> is assigned to
the single global window, and late data is discarded.</li><li><strong>Calendar-based windows</strong> (not supported by the Beam SDK for Python)</li></ul><p>You can also define your own windowing function if you have more complex
requirements.</p><p>For example, let&rsquo;s say we have a <code>PCollection</code> that uses fixed-time windowing,
with windows that are five minutes long. For each window, Beam must collect all
the data with an event time timestamp in the given window range (between 0:00
and 4:59 in the first window, for instance). Data with timestamps outside that
range (data from 5:00 or later) belongs to a different window.</p><p>Two concepts are closely related to windowing and covered in the following
sections: <a href=#watermark>watermarks</a> and <a href=#trigger>triggers</a>.</p><p>For more information about windows, see the following page:</p><ul><li><a href=/documentation/programming-guide/#windowing>Beam Programming Guide: Windowing</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Beam Programming Guide: WindowFn</a></li></ul><h2 id=watermark>Watermark</h2><p>In any data processing system, there is a certain amount of lag between the time
a data event occurs (the “event time”, determined by the timestamp on the data
element itself) and the time the actual data element gets processed at any stage
in your pipeline (the “processing time”, determined by the clock on the system
processing the element). In addition, data isn’t always guaranteed to arrive in
a pipeline in time order, or to always arrive at predictable intervals. For
example, you might have intermediate systems that don&rsquo;t preserve order, or you
might have two servers that timestamp data but one has a better network
connection.</p><p>To address this potential unpredictability, Beam tracks a <em>watermark</em>. A
watermark is a guess as to when all data in a certain window is expected to have
arrived in the pipeline. You can also think of this as “we’ll never see an
element with an earlier timestamp”.</p><p>Data sources are responsible for producing a watermark, and every <code>PCollection</code>
must have a watermark that estimates how complete the <code>PCollection</code> is. The
contents of a <code>PCollection</code> are complete when a watermark advances to
“infinity”. In this manner, you might discover that an unbounded <code>PCollection</code>
is finite. After the watermark progresses past the end of a window, any further
element that arrives with a timestamp in that window is considered <em>late data</em>.</p><p><a href=#trigger>Triggers</a> are a related concept that allow you to modify and refine
the windowing strategy for a <code>PCollection</code>. You can use triggers to decide when
each individual window aggregates and reports its results, including how the
window emits late elements.</p><p>For more information about watermarks, see the following page:</p><ul><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Beam Programming Guide: Watermarks and late data</a></li></ul><h2 id=trigger>Trigger</h2><p>When collecting and grouping data into windows, Beam uses <em>triggers</em> to
determine when to emit the aggregated results of each window (referred to as a
<em>pane</em>). If you use Beam’s default windowing configuration and default trigger,
Beam outputs the aggregated result when it estimates all data has arrived, and
discards all subsequent data for that window.</p><p>At a high level, triggers provide two additional capabilities compared to
outputting at the end of a window:</p><ol><li>Triggers allow Beam to emit early results, before all the data in a given
window has arrived. For example, emitting after a certain amount of time
elapses, or after a certain number of elements arrives.</li><li>Triggers allow processing of late data by triggering after the event time
watermark passes the end of the window.</li></ol><p>These capabilities allow you to control the flow of your data and also balance
between data completeness, latency, and cost.</p><p>Beam provides a number of pre-built triggers that you can set:</p><ul><li><strong>Event time triggers</strong>: These triggers operate on the event time, as
indicated by the timestamp on each data element. Beam’s default trigger is
event time-based.</li><li><strong>Processing time triggers</strong>: These triggers operate on the processing time,
which is the time when the data element is processed at any given stage in
the pipeline.</li><li><strong>Data-driven triggers</strong>: These triggers operate by examining the data as it
arrives in each window, and firing when that data meets a certain property.
Currently, data-driven triggers only support firing after a certain number of
data elements.</li><li><strong>Composite triggers</strong>: These triggers combine multiple triggers in various
ways. For example, you might want one trigger for early data and a different
trigger for late data.</li></ul><p>For more information about triggers, see the following page:</p><ul><li><a href=/documentation/programming-guide/#triggers>Beam Programming Guide: Triggers</a></li></ul><h2 id=state-and-timers>State and timers</h2><p>Beam’s windowing and triggers provide an abstraction for grouping and
aggregating unbounded input data based on timestamps. However, there are
aggregation use cases that might require an even higher degree of control. State
and timers are two important concepts that help with these uses cases. Like
other aggregations, state and timers are processed per window.</p><p><strong>State</strong>:</p><p>Beam provides the State API for manually managing per-key state, allowing for
fine-grained control over aggregations. The State API lets you augment
element-wise operations (for example, <code>ParDo</code> or <code>Map</code>) with mutable state. Like
other aggregations, state is processed per window.</p><p>The State API models state per key. To use the state API, you start out with a
keyed <code>PCollection</code>. A <code>ParDo</code> that processes this <code>PCollection</code> can declare
persistent state variables. When you process each element inside the <code>ParDo</code>,
you can use the state variables to write or update state for the current key or
to read previous state written for that key. State is always fully scoped only
to the current processing key.</p><p>Beam provides several types of state, though different runners might support a
different subset of these states.</p><ul><li><strong>ValueState</strong>: ValueState is a scalar state value. For each key in the
input, a ValueState stores a typed value that can be read and modified inside
the <code>DoFn</code>.</li><li>A common use case for state is to accumulate multiple elements into a group:<ul><li><strong>BagState</strong>: BagState allows you to accumulate elements in an unordered
bag. This lets you add elements to a collection without needing to read any
of the previously accumulated elements.</li><li><strong>MapState</strong>: MapState allows you to accumulate elements in a map.</li><li><strong>SetState</strong>: SetState allows you to accumulate elements in a set.</li><li><strong>OrderedListState</strong>: OrderedListState allows you to accumulate elements in
a timestamp-sorted list.</li></ul></li><li><strong>CombiningState</strong>: CombiningState allows you to create a state object that
is updated using a Beam combiner. Like BagState, you can add elements to an
aggregation without needing to read the current value, and the accumulator
can be compacted using a combiner.</li></ul><p>You can use the State API together with the Timer API to create processing tasks
that give you fine-grained control over the workflow.</p><p><strong>Timers</strong>:</p><p>Beam provides a per-key timer callback API that enables delayed processing of
data stored using the State API. The Timer API lets you set timers to call back
at either an event-time or a processing-time timestamp. For more advanced use
cases, your timer callback can set another timer. Like other aggregations,
timers are processed per window. You can use the timer API together with the
State API to create processing tasks that give you fine-grained control over the
workflow.</p><p>The following timers are available:</p><ul><li><strong>Event-time timers</strong>: Event-time timers fire when the input watermark for
the <code>DoFn</code> passes the time at which the timer is set, meaning that the runner
believes that there are no more elements to be processed with timestamps
before the timer timestamp. This allows for event-time aggregations.</li><li><strong>Processing-time timers</strong>: Processing-time timers fire when the real wall-clock
time passes. This is often used to create larger batches of data before
processing. It can also be used to schedule events that should occur at a
specific time.</li><li><strong>Dynamic timer tags</strong>: Beam also supports dynamically setting a timer tag. This
allows you to set multiple different timers in a <code>DoFn</code> and dynamically
choose timer tags (for example, based on data in the input elements).</li></ul><p>For more information about state and timers, see the following pages:</p><ul><li><a href=/documentation/programming-guide/#state-and-timers>Beam Programming Guide: State and Timers</a></li><li><a href=/blog/stateful-processing/>Stateful processing with Apache Beam</a></li><li><a href=/blog/timely-processing/>Timely (and Stateful) Processing with Apache Beam</a></li></ul><h2 id=splittable-dofn>Splittable DoFn</h2><p>Splittable <code>DoFn</code> (SDF) is a generalization of <code>DoFn</code> that lets you process
elements in a non-monolithic way. Splittable <code>DoFn</code> makes it easier to create
complex, modular I/O connectors in Beam.</p><p>A regular <code>ParDo</code> processes an entire element at a time, applying your regular
<code>DoFn</code> and waiting for the call to terminate. When you instead apply a
splittable <code>DoFn</code> to each element, the runner has the option of splitting the
element&rsquo;s processing into smaller tasks. You can checkpoint the processing of an
element, and you can split the remaining work to yield additional parallelism.</p><p>For example, imagine you want to read every line from very large text files.
When you write your splittable <code>DoFn</code>, you can have separate pieces of logic to
read a segment of a file, split a segment of a file into sub-segments, and
report progress through the current segment. The runner can then invoke your
splittable <code>DoFn</code> intelligently to split up each input and read portions
separately, in parallel.</p><p>A common computation pattern has the following steps:</p><ol><li>The runner splits an incoming element before starting any processing.</li><li>The runner starts running your processing logic on each sub-element.</li><li>If the runner notices that some sub-elements are taking longer than others,
the runner splits those sub-elements further and repeats step 2.</li><li>The sub-element either finishes processing, or the user chooses to
checkpoint the sub-element and the runner repeats step 2.</li></ol><p>You can also write your splittable <code>DoFn</code> so the runner can split the unbounded
processing. For example, if you write a splittable <code>DoFn</code> to watch a set of
directories and output filenames as they arrive, you can split to subdivide the
work of different directories. This allows the runner to split off a hot
directory and give it additional resources.</p><p>For more information about Splittable <code>DoFn</code>, see the following pages:</p><ul><li><a href=/documentation/programming-guide/#splittable-dofns>Splittable DoFns</a></li><li><a href=/blog/splittable-do-fn-is-available/>Splittable DoFn in Apache Beam is Ready to Use</a></li></ul><h2 id=whats-next>What&rsquo;s next</h2><p>Take a look at our <a href=/documentation/>other documentation</a> such as the Beam
programming guide, pipeline execution information, and transform reference
catalogs.</p><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>