| <!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Beam WordCount Examples</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.d653ded46cd5f19a535cb20567fce9699849fe46f950d91ac6bf336db8ff8724.css as=style><link href=/scss/main.min.d653ded46cd5f19a535cb20567fce9699849fe46f950d91ac6bf336db8ff8724.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script> |
| <script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script> |
| <script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script> |
| <script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script> |
| <script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script> |
| <script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script> |
| <script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script> |
| <script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script> |
| <script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script> |
| <script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script> |
| <link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/get-started/wordcount-example/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script> |
| <script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a> |
| <a class=navbar-link href=/get-started/>Get Started</a> |
| <a class=navbar-link href=/documentation/>Documentation</a> |
| <button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()> |
| <span class=sr-only>Toggle navigation</span> |
| <span class=icon-bar></span> |
| <span class=icon-bar></span> |
| <span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu> |
| <span class=sr-only>Toggle navigation</span> |
| <span class=icon-bar></span> |
| <span class=icon-bar></span> |
| <span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/get-started/wordcount-example.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px> |
| Apache |
| <span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a> |
| <a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation |
| <span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a> |
| <a class=navbar-link href=/community/>Community</a> |
| <a class=navbar-link href=/contribute/>Contribute</a> |
| <a class=navbar-link href=/blog/>Blog</a> |
| <a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/get-started/wordcount-example.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px> |
| Apache |
| <span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search> |
| <a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam"> |
| <img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning"> |
| <img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script> |
| <script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script> |
| <script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Get started</span></li><li><a href=/get-started/beam-overview/>Beam Overview</a></li><li><a href=/get-started/an-interactive-overview-of-beam/>An Interactive Overview of Beam</a></li><li><span class=section-nav-list-title>Quickstarts</span><ul class=section-nav-list><li><a href=https://tour.beam.apache.org>Tour of Beam</a></li><li><a href=/get-started/try-apache-beam/>Try Apache Beam</a></li><li><a href=/get-started/try-beam-playground/>Try Beam Playground</a></li><li><a href=/get-started/quickstart/java/>Java quickstart</a></li><li><a href=/get-started/quickstart/python/>Python quickstart</a></li><li><a href=/get-started/quickstart/go/>Go quickstart</a></li><li><a href=/get-started/quickstart/typescript/>Typescript quickstart</a></li><li><a href=/get-started/from-spark/>Apache Spark</a></li><li><a href=/get-started/quickstart-java/>WordCount (Java)</a></li><li><a href=/get-started/quickstart-py/>WordCount (Python)</a></li><li><a href=/get-started/quickstart-go/>WordCount (Go)</a></li></ul></li><li><a href=/get-started/downloads>Install the SDK</a></li><li><span class=section-nav-list-title>Tutorials</span><ul class=section-nav-list><li><a href=/get-started/wordcount-example/>WordCount</a></li><li><a href=/get-started/mobile-gaming-example/>Mobile Gaming</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Learning resources</span><ul class=section-nav-list><li><a href=/get-started/resources/learning-resources/#getting-started>Getting Started</a></li><li><a href=/get-started/resources/learning-resources/#articles>Articles</a></li><li><a href=/get-started/resources/learning-resources/#videos>Videos</a></li><li><a href=/get-started/resources/learning-resources/#courses>Courses</a></li><li><a href=/get-started/resources/learning-resources/#books>Books</a></li><li><a href=/get-started/resources/learning-resources/#certifications>Certifications</a></li><li><a href=/get-started/resources/learning-resources/#interactive-labs>Interactive Labs</a></li><li><a href=/get-started/resources/learning-resources/#beam-katas>Beam Katas</a></li><li><a href=/get-started/resources/learning-resources/#code-examples>Code Examples</a></li><li><a href=/get-started/resources/learning-resources/#api-reference>API Reference</a></li><li><a href=/get-started/resources/learning-resources/#feedback-and-suggestions>Feedback and Suggestions</a></li><li><a href=/get-started/resources/learning-resources/#how-to-contribute>How to Contribute</a></li><li><a href=/get-started/resources/videos-and-podcasts>Videos and Podcasts</a></li></ul></li><li><a href=/security>Security</a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#minimalwordcount-example>MinimalWordCount example</a><ul><li><a href=#creating-the-pipeline>Creating the pipeline</a></li><li><a href=#applying-pipeline-transforms>Applying pipeline transforms</a></li><li><a href=#running-the-pipeline>Running the pipeline</a></li><li><a href=#try-the-full-example-in-playground>Try the full example in Playground</a></li></ul></li><li><a href=#wordcount-example>WordCount example</a><ul><li><a href=#specifying-explicit-dofns>Specifying explicit DoFns</a></li><li><a href=#creating-composite-transforms>Creating composite transforms</a></li><li><a href=#using-parameterizable-pipelineoptions>Using parameterizable PipelineOptions</a></li><li><a href=#try-the-full-example-in-playground-1>Try the full example in Playground</a></li></ul></li><li><a href=#debuggingwordcount-example>DebuggingWordCount example</a><ul><li><a href=#logging>Logging</a><ul><li><a href=#direct-runner>Direct Runner</a></li><li><a href=#cloud-dataflow-runner>Cloud Dataflow Runner</a></li><li><a href=#apache-spark-runner>Apache Spark Runner</a></li><li><a href=#apache-flink-runner>Apache Flink Runner</a></li><li><a href=#apache-nemo-runner>Apache Nemo Runner</a></li></ul></li><li><a href=#testing-your-pipeline-with-asserts>Testing your pipeline with asserts</a></li><li><a href=#try-the-full-example-in-playground-2>Try the full example in Playground</a></li></ul></li><li><a href=#windowedwordcount-example>WindowedWordCount example</a><ul><li><a href=#unbounded-and-bounded-datasets>Unbounded and bounded datasets</a></li><li><a href=#adding-timestamps-to-data>Adding timestamps to data</a></li><li><a href=#windowing>Windowing</a></li><li><a href=#reusing-ptransforms-over-windowed-pcollections>Reusing PTransforms over windowed PCollections</a></li></ul></li><li><a href=#streamingwordcount-example>StreamingWordCount example</a><ul><li><a href=#reading-an-unbounded-dataset>Reading an unbounded dataset</a></li><li><a href=#writing-unbounded-results>Writing unbounded results</a></li></ul></li><li><a href=#next-steps>Next Steps</a></li></ul></nav></nav><div class="body__contained body__section-nav"><h1 id=apache-beam-wordcount-examples>Apache Beam WordCount Examples</h1><nav id=TableOfContents><ul><li><a href=#minimalwordcount-example>MinimalWordCount example</a><ul><li><a href=#creating-the-pipeline>Creating the pipeline</a></li><li><a href=#applying-pipeline-transforms>Applying pipeline transforms</a></li><li><a href=#running-the-pipeline>Running the pipeline</a></li><li><a href=#try-the-full-example-in-playground>Try the full example in Playground</a></li></ul></li><li><a href=#wordcount-example>WordCount example</a><ul><li><a href=#specifying-explicit-dofns>Specifying explicit DoFns</a></li><li><a href=#creating-composite-transforms>Creating composite transforms</a></li><li><a href=#using-parameterizable-pipelineoptions>Using parameterizable PipelineOptions</a></li><li><a href=#try-the-full-example-in-playground-1>Try the full example in Playground</a></li></ul></li><li><a href=#debuggingwordcount-example>DebuggingWordCount example</a><ul><li><a href=#logging>Logging</a><ul><li><a href=#direct-runner>Direct Runner</a></li><li><a href=#cloud-dataflow-runner>Cloud Dataflow Runner</a></li><li><a href=#apache-spark-runner>Apache Spark Runner</a></li><li><a href=#apache-flink-runner>Apache Flink Runner</a></li><li><a href=#apache-nemo-runner>Apache Nemo Runner</a></li></ul></li><li><a href=#testing-your-pipeline-with-asserts>Testing your pipeline with asserts</a></li><li><a href=#try-the-full-example-in-playground-2>Try the full example in Playground</a></li></ul></li><li><a href=#windowedwordcount-example>WindowedWordCount example</a><ul><li><a href=#unbounded-and-bounded-datasets>Unbounded and bounded datasets</a></li><li><a href=#adding-timestamps-to-data>Adding timestamps to data</a></li><li><a href=#windowing>Windowing</a></li><li><a href=#reusing-ptransforms-over-windowed-pcollections>Reusing PTransforms over windowed PCollections</a></li></ul></li><li><a href=#streamingwordcount-example>StreamingWordCount example</a><ul><li><a href=#reading-an-unbounded-dataset>Reading an unbounded dataset</a></li><li><a href=#writing-unbounded-results>Writing unbounded results</a></li></ul></li><li><a href=#next-steps>Next Steps</a></li></ul></nav><nav class=language-switcher><strong>Adapt for:</strong><ul><li data-value=java class=active>Java SDK</li><li data-value=py>Python SDK</li><li data-value=go>Go SDK</li></ul></nav><p>The WordCount examples demonstrate how to set up a processing pipeline that can |
| read text, tokenize the text lines into individual words, and perform a |
| frequency count on each of those words. The Beam SDKs contain a series of these |
| four successively more detailed WordCount examples that build on each other. The |
| input text for all the examples is a set of Shakespeare’s texts.</p><p>Each WordCount example introduces different concepts in the Beam programming |
| model. Begin by understanding MinimalWordCount, the simplest of the examples. |
| Once you feel comfortable with the basic principles in building a pipeline, |
| continue on to learn more concepts in the other examples.</p><ul><li><strong>MinimalWordCount</strong> demonstrates the basic principles involved in building a |
| pipeline.</li><li><strong>WordCount</strong> introduces some of the more common best practices in creating |
| re-usable and maintainable pipelines.</li><li><strong>DebuggingWordCount</strong> introduces logging and debugging practices.</li><li><strong>WindowedWordCount</strong> demonstrates how you can use Beam’s programming model |
| to handle both bounded and unbounded datasets.</li></ul><h2 id=minimalwordcount-example>MinimalWordCount example</h2><p>MinimalWordCount demonstrates a simple pipeline that uses the Direct Runner to |
| read from a text file, apply transforms to tokenize and count the words, and |
| write the data to an output text file.</p><p class="language-java language-go">This example hard-codes the locations for its input and output files and doesn’t |
| perform any error checking; it is intended to only show you the “bare bones” of |
| creating a Beam pipeline. This lack of parameterization makes this particular |
| pipeline less portable across different runners than standard Beam pipelines. In |
| later examples, we will parameterize the pipeline’s input and output sources and |
| show other best practices.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>$</span> <span class=n>mvn</span> <span class=n>compile</span> <span class=n>exec</span><span class=o>:</span><span class=n>java</span> <span class=o>-</span><span class=n>Dexec</span><span class=o>.</span><span class=na>mainClass</span><span class=o>=</span><span class=n>org</span><span class=o>.</span><span class=na>apache</span><span class=o>.</span><span class=na>beam</span><span class=o>.</span><span class=na>examples</span><span class=o>.</span><span class=na>MinimalWordCount</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>python</span> <span class=o>-</span><span class=n>m</span> <span class=n>apache_beam</span><span class=o>.</span><span class=n>examples</span><span class=o>.</span><span class=n>wordcount_minimal</span> <span class=o>--</span><span class=nb>input</span> <span class=n>YOUR_INPUT_FILE</span> <span class=o>--</span><span class=n>output</span> <span class=n>counts</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=err>$</span> <span class=k>go</span> <span class=nx>install</span> <span class=nx>github</span><span class=p>.</span><span class=nx>com</span><span class=o>/</span><span class=nx>apache</span><span class=o>/</span><span class=nx>beam</span><span class=o>/</span><span class=nx>sdks</span><span class=o>/</span><span class=nx>v2</span><span class=o>/</span><span class=k>go</span><span class=o>/</span><span class=nx>examples</span><span class=o>/</span><span class=nx>minimal_wordcount</span> |
| </span></span><span class=line><span class=cl><span class=err>$</span> <span class=nx>minimal_wordcount</span></span></span></code></pre></div></div></div><p class=language-java>To view the full code in Java, see |
| <strong><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java>MinimalWordCount</a>.</strong></p><p class=language-py>To view the full code in Python, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_minimal.py>wordcount_minimal.py</a>.</strong></p><p class=language-go>To view the full code in Go, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/go/examples/minimal_wordcount/minimal_wordcount.go>minimal_wordcount.go</a>.</strong></p><p><strong>Key Concepts:</strong></p><ul><li>Creating the Pipeline</li><li>Applying transforms to the Pipeline</li><li>Reading input (in this example: reading text files)</li><li>Applying ParDo transforms</li><li>Applying SDK-provided transforms (in this example: Count)</li><li>Writing output (in this example: writing to a text file)</li><li>Running the Pipeline</li></ul><p>The following sections explain these concepts in detail, using the relevant code |
| excerpts from the MinimalWordCount pipeline.</p><h3 id=creating-the-pipeline>Creating the pipeline</h3><p class="language-java language-py">In this example, the code first creates a <code>PipelineOptions</code> object. This object |
| lets us set various options for our pipeline, such as the pipeline runner that |
| will execute our pipeline and any runner-specific configuration required by the |
| chosen runner. In this example we set these options programmatically, but more |
| often, command-line arguments are used to set <code>PipelineOptions</code>.</p><p class="language-java language-py">You can specify a runner for executing your pipeline, such as the |
| <code>DataflowRunner</code> or <code>SparkRunner</code>. If you omit specifying a runner, as in this |
| example, your pipeline executes locally using the <code>DirectRunner</code>. In the next |
| sections, we will specify the pipeline’s runner.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl> <span class=c1>// Create a PipelineOptions object. This object lets us set various execution |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// options for our pipeline, such as the runner you wish to use. This example |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// will run with the DirectRunner by default, based on the class path configured |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=c1>// in its dependencies. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>PipelineOptions</span> <span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptionsFactory</span><span class=o>.</span><span class=na>create</span><span class=o>();</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=kn>from</span> <span class=nn>apache_beam.options.pipeline_options</span> <span class=kn>import</span> <span class=n>PipelineOptions</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=n>input_file</span> <span class=o>=</span> <span class=s1>'gs://dataflow-samples/shakespeare/kinglear.txt'</span> |
| </span></span><span class=line><span class=cl><span class=n>output_path</span> <span class=o>=</span> <span class=s1>'gs://my-bucket/counts.txt'</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=n>beam_options</span> <span class=o>=</span> <span class=n>PipelineOptions</span><span class=p>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>runner</span><span class=o>=</span><span class=s1>'DataflowRunner'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>project</span><span class=o>=</span><span class=s1>'my-project-id'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>job_name</span><span class=o>=</span><span class=s1>'unique-job-name'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>temp_location</span><span class=o>=</span><span class=s1>'gs://my-bucket/temp'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl><span class=p>)</span></span></span></code></pre></div></div></div><p class="language-java language-py">The next step is to create a <code>Pipeline</code> object with the options we’ve just |
| constructed. The Pipeline object builds up the graph of transformations to be |
| executed, associated with that particular pipeline.</p><p class=language-go>The first step is to create a <code>Pipeline</code> object. It builds up the graph of |
| transformations to be executed, associated with that particular pipeline. |
| The scope allows grouping into composite transforms.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>Pipeline</span> <span class=n>p</span> <span class=o>=</span> <span class=n>Pipeline</span><span class=o>.</span><span class=na>create</span><span class=o>(</span><span class=n>options</span><span class=o>);</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>pipeline</span> <span class=o>=</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=n>options</span><span class=o>=</span><span class=n>beam_options</span><span class=p>)</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>p</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>NewPipeline</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl><span class=nx>s</span> <span class=o>:=</span> <span class=nx>p</span><span class=p>.</span><span class=nf>Root</span><span class=p>()</span></span></span></code></pre></div></div></div><h3 id=applying-pipeline-transforms>Applying pipeline transforms</h3><p>The MinimalWordCount pipeline contains several transforms to read data into the |
| pipeline, manipulate or otherwise transform the data, and write out the results. |
| Transforms can consist of an individual operation, or can contain multiple |
| nested transforms (which is a <a href=/documentation/programming-guide#composite-transforms>composite transform</a>).</p><p>Each transform takes some kind of input data and produces some output data. The |
| input and output data is often represented by the SDK class <code>PCollection</code>. |
| <code>PCollection</code> is a special class, provided by the Beam SDK, that you can use to |
| represent a dataset of virtually any size, including unbounded datasets.</p><img src=/images/wordcount-pipeline.svg width=800px alt="The MinimalWordCount pipeline data flow."><p><em>Figure 1: The MinimalWordCount pipeline data flow.</em></p><p>The MinimalWordCount pipeline contains five transforms:</p><ol><li>A text file <code>Read</code> transform is applied to the <code>Pipeline</code> object itself, and |
| produces a <code>PCollection</code> as output. Each element in the output <code>PCollection</code> |
| represents one line of text from the input file. This example uses input |
| data stored in a publicly accessible Google Cloud Storage bucket (“gs://”).</li></ol><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>TextIO</span><span class=o>.</span><span class=na>read</span><span class=o>().</span><span class=na>from</span><span class=o>(</span><span class=s>"gs://apache-beam-samples/shakespeare/*"</span><span class=o>))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>pipeline</span> |
| </span></span><span class=line><span class=cl><span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromText</span><span class=p>(</span><span class=n>input_file</span><span class=p>)</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>lines</span> <span class=o>:=</span> <span class=nx>textio</span><span class=p>.</span><span class=nf>Read</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=s>"gs://apache-beam-samples/shakespeare/*"</span><span class=p>)</span></span></span></code></pre></div></div></div><ol start=2><li>This transform splits the lines in <code>PCollection<String></code>, where each element |
| is an individual word in Shakespeare’s collected texts. |
| As an alternative, it would have been possible to use a |
| <a href=/documentation/programming-guide/#pardo>ParDo</a> |
| transform that invokes a <code>DoFn</code> (defined in-line as an anonymous class) on |
| each element that tokenizes the text lines into individual words. The input |
| for this transform is the <code>PCollection</code> of text lines generated by the |
| previous <code>TextIO.Read</code> transform. The <code>ParDo</code> transform outputs a new |
| <code>PCollection</code>, where each element represents an individual word in the text.</li></ol><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>"ExtractWords"</span><span class=o>,</span> <span class=n>FlatMapElements</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>TypeDescriptors</span><span class=o>.</span><span class=na>strings</span><span class=o>())</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>via</span><span class=o>((</span><span class=n>String</span> <span class=n>line</span><span class=o>)</span> <span class=o>-></span> <span class=n>Arrays</span><span class=o>.</span><span class=na>asList</span><span class=o>(</span><span class=n>line</span><span class=o>.</span><span class=na>split</span><span class=o>(</span><span class=s>"[^\\p{L}]+"</span><span class=o>))))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># The Flatmap transform is a simplified version of ParDo.</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>'ExtractWords'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>FlatMap</span><span class=p>(</span><span class=k>lambda</span> <span class=n>x</span><span class=p>:</span> <span class=n>re</span><span class=o>.</span><span class=n>findall</span><span class=p>(</span><span class=sa>r</span><span class=s1>'[A-Za-z</span><span class=se>\'</span><span class=s1>]+'</span><span class=p>,</span> <span class=n>x</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>words</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>ParDo</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=kd>func</span><span class=p>(</span><span class=nx>line</span> <span class=kt>string</span><span class=p>,</span> <span class=nx>emit</span> <span class=kd>func</span><span class=p>(</span><span class=kt>string</span><span class=p>))</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=k>for</span> <span class=nx>_</span><span class=p>,</span> <span class=nx>word</span> <span class=o>:=</span> <span class=k>range</span> <span class=nx>wordRE</span><span class=p>.</span><span class=nf>FindAllString</span><span class=p>(</span><span class=nx>line</span><span class=p>,</span> <span class=o>-</span><span class=mi>1</span><span class=p>)</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=nf>emit</span><span class=p>(</span><span class=nx>word</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=p>}</span> |
| </span></span><span class=line><span class=cl><span class=p>},</span> <span class=nx>lines</span><span class=p>)</span></span></span></code></pre></div></div></div><ol start=3><li><p>The SDK-provided <code>Count</code> transform is a generic transform that takes a |
| <code>PCollection</code> of any type, and returns a <code>PCollection</code> of key/value pairs. |
| Each key represents a unique element from the input collection, and each |
| value represents the number of times that key appeared in the input |
| collection.</p><p>In this pipeline, the input for <code>Count</code> is the <code>PCollection</code> of individual |
| words generated by the previous <code>ParDo</code>, and the output is a <code>PCollection</code> |
| of key/value pairs where each key represents a unique word in the text and |
| the associated value is the occurrence count for each.</p></li></ol><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Count</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>perElement</span><span class=o>())</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>combiners</span><span class=o>.</span><span class=n>Count</span><span class=o>.</span><span class=n>PerElement</span><span class=p>()</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>counted</span> <span class=o>:=</span> <span class=nx>stats</span><span class=p>.</span><span class=nf>Count</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>words</span><span class=p>)</span></span></span></code></pre></div></div></div><ol start=4><li><p>The next transform formats each of the key/value pairs of unique words and |
| occurrence counts into a printable string suitable for writing to an output |
| file.</p><p>The map transform is a higher-level composite transform that encapsulates a |
| simple <code>ParDo</code>. For each element in the input <code>PCollection</code>, the map |
| transform applies a function that produces exactly one output element.</p></li></ol><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>"FormatResults"</span><span class=o>,</span> <span class=n>MapElements</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>TypeDescriptors</span><span class=o>.</span><span class=na>strings</span><span class=o>())</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>via</span><span class=o>((</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>></span> <span class=n>wordCount</span><span class=o>)</span> <span class=o>-></span> <span class=n>wordCount</span><span class=o>.</span><span class=na>getKey</span><span class=o>()</span> <span class=o>+</span> <span class=s>": "</span> <span class=o>+</span> <span class=n>wordCount</span><span class=o>.</span><span class=na>getValue</span><span class=o>()))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>MapTuple</span><span class=p>(</span><span class=k>lambda</span> <span class=n>word</span><span class=p>,</span> <span class=n>count</span><span class=p>:</span> <span class=s1>'</span><span class=si>%s</span><span class=s1>: </span><span class=si>%s</span><span class=s1>'</span> <span class=o>%</span> <span class=p>(</span><span class=n>word</span><span class=p>,</span> <span class=n>count</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>formatted</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>ParDo</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=kd>func</span><span class=p>(</span><span class=nx>w</span> <span class=kt>string</span><span class=p>,</span> <span class=nx>c</span> <span class=kt>int</span><span class=p>)</span> <span class=kt>string</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=k>return</span> <span class=nx>fmt</span><span class=p>.</span><span class=nf>Sprintf</span><span class=p>(</span><span class=s>"%s: %v"</span><span class=p>,</span> <span class=nx>w</span><span class=p>,</span> <span class=nx>c</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl><span class=p>},</span> <span class=nx>counted</span><span class=p>)</span></span></span></code></pre></div></div></div><ol start=5><li>A text file write transform. This transform takes the final <code>PCollection</code> of |
| formatted Strings as input and writes each element to an output text file. |
| Each element in the input <code>PCollection</code> represents one line of text in the |
| resulting output file.</li></ol><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>TextIO</span><span class=o>.</span><span class=na>write</span><span class=o>().</span><span class=na>to</span><span class=o>(</span><span class=s>"wordcounts"</span><span class=o>));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToText</span><span class=p>(</span><span class=n>output_path</span><span class=p>)</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>textio</span><span class=p>.</span><span class=nf>Write</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=s>"wordcounts.txt"</span><span class=p>,</span> <span class=nx>formatted</span><span class=p>)</span></span></span></code></pre></div></div></div><p class="language-java language-py">Note that the <code>Write</code> transform produces a trivial result value of type <code>PDone</code>, |
| which in this case is ignored.</p><p class=language-go>Note that the <code>Write</code> transform returns no PCollections.</p><h3 id=running-the-pipeline>Running the pipeline</h3><p class="language-java language-py">Run the pipeline by calling the <code>run</code> method, which sends your pipeline to be |
| executed by the pipeline runner that you specified in your <code>PipelineOptions</code>.</p><p class=language-go>Run the pipeline by passing it to a runner.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>p</span><span class=o>.</span><span class=na>run</span><span class=o>().</span><span class=na>waitUntilFinish</span><span class=o>();</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>with</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=o>...</span><span class=p>)</span> <span class=k>as</span> <span class=n>p</span><span class=p>:</span> |
| </span></span><span class=line><span class=cl> <span class=p>[</span><span class=n>construction</span><span class=p>]</span> |
| </span></span><span class=line><span class=cl><span class=c1># p.run() automatically called</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>direct</span><span class=p>.</span><span class=nf>Execute</span><span class=p>(</span><span class=nx>context</span><span class=p>.</span><span class=nf>Background</span><span class=p>(),</span> <span class=nx>p</span><span class=p>)</span></span></span></code></pre></div></div></div><p class="language-java language-py">Note that the <code>run</code> method is asynchronous. For a blocking execution, call the |
| <span class=language-java><code>waitUntilFinish</code></span> |
| <span class=language-py><code>wait_until_finish</code></span> method on the result object |
| returned by the call to <code>run</code>.</p><h3 id=try-the-full-example-in-playground>Try the full example in Playground</h3><div class=playground-wrapper><div class=playground-snippets><div class="language-java playground-snippet" data-sdk=java data-path=SDK_JAVA_MinimalWordCount></div><div class="language-py playground-snippet" data-sdk=python data-path=SDK_PYTHON_WordCountMinimal></div><div class="language-go playground-snippet" data-sdk=go data-path=SDK_GO_MinimalWordCount></div></div><div class="code-snippet code-snippet-playground" data-src="https://play.beam.apache.org/embedded?editable=1&examples=%5b%7b%22path%22%3a%22SDK_JAVA_MinimalWordCount%22%2c%22sdk%22%3a%22java%22%7d%2c%7b%22path%22%3a%22SDK_PYTHON_WordCountMinimal%22%2c%22sdk%22%3a%22python%22%7d%2c%7b%22path%22%3a%22SDK_GO_MinimalWordCount%22%2c%22sdk%22%3a%22go%22%7d%5d" data-width=100% data-height=700px></div></div><h2 id=wordcount-example>WordCount example</h2><p>This WordCount example introduces a few recommended programming practices that |
| can make your pipeline easier to read, write, and maintain. While not explicitly |
| required, they can make your pipeline’s execution more flexible, aid in testing |
| your pipeline, and help make your pipeline’s code reusable.</p><p>This section assumes that you have a good understanding of the basic concepts in |
| building a pipeline. If you feel that you aren’t at that point yet, read the |
| above section, <a href=#minimalwordcount-example>MinimalWordCount</a>.</p><p><strong>To run this example in Java:</strong></p><p>Set up your development environment and generate the Maven archetype as |
| described in the <a href=/get-started/quickstart-java/>Java WordCount quickstart</a>. |
| Then run the pipeline with one of the runners:</p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ |
| -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ |
| -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ |
| -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \ |
| --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner |
| |
| You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ |
| -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ |
| -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \ |
| --project=YOUR_PROJECT --region=GCE_REGION \ |
| --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \ |
| -Pdataflow-runner</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ |
| -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \ |
| --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \ |
| --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts</code></pre></div></div><p>To view the full code in Java, see |
| <strong><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java>WordCount</a>.</strong></p><p><strong>To run this example in Python:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>python -m apache_beam.examples.wordcount --input /path/to/inputfile \ |
| --output /path/to/write/counts \ |
| --runner FlinkRunner</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster># Running Beam Python on a distributed Flink cluster requires additional configuration. |
| # See /documentation/runners/flink/ for more information.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>python -m apache_beam.examples.wordcount --input /path/to/inputfile \ |
| --output /path/to/write/counts \ |
| --runner SparkRunner</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow># As part of the initial setup, install Google Cloud Platform specific extra components. |
| pip install apache-beam[gcp] |
| python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \ |
| --output gs://YOUR_GCS_BUCKET/counts \ |
| --runner DataflowRunner \ |
| --project YOUR_GCP_PROJECT \ |
| --region YOUR_GCP_REGION \ |
| --temp_location gs://YOUR_GCS_BUCKET/tmp/</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Python SDK.</code></pre></div></div><p>To view the full code in Python, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py>wordcount.py</a>.</strong></p><p><strong>To run this example in Go:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount |
| $ wordcount --input <PATH_TO_INPUT_FILE> --output counts</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow>$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount |
| # As part of the initial setup, for non linux users - install package unix before run |
| $ go get -u golang.org/x/sys/unix |
| $ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \ |
| --output gs://<your-gcs-bucket>/counts \ |
| --runner dataflow \ |
| --project your-gcp-project \ |
| --region your-gcp-region \ |
| --temp_location gs://<your-gcs-bucket>/tmp/ \ |
| --staging_location gs://<your-gcs-bucket>/binaries/ \ |
| --worker_harness_container_image=apache/beam_go_sdk:latest</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Go SDK.</code></pre></div></div><p>To view the full code in Go, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/go/examples/wordcount/wordcount.go>wordcount.go</a>.</strong></p><p><strong>New Concepts:</strong></p><ul><li>Applying <code>ParDo</code> with an explicit <code>DoFn</code></li><li>Creating Composite Transforms</li><li>Using Parameterizable <code>PipelineOptions</code></li></ul><p>The following sections explain these key concepts in detail, and break down the |
| pipeline code into smaller sections.</p><h3 id=specifying-explicit-dofns>Specifying explicit DoFns</h3><p class="language-java language-py">When using <code>ParDo</code> transforms, you need to specify the processing operation that |
| gets applied to each element in the input <code>PCollection</code>. This processing |
| operation is a subclass of the SDK class <code>DoFn</code>. You can create the <code>DoFn</code> |
| subclasses for each <code>ParDo</code> inline, as an anonymous inner class instance, as is |
| done in the previous example (MinimalWordCount). However, it’s often a good |
| idea to define the <code>DoFn</code> at the global level, which makes it easier to unit |
| test and can make the <code>ParDo</code> code more readable.</p><p class=language-go>When using <code>ParDo</code> transforms, you need to specify the processing operation that |
| gets applied to each element in the input <code>PCollection</code>. This processing |
| operation is either a named function or a struct with specially-named methods. You |
| can use anonymous functions (but not closures). However, it’s often a good |
| idea to define the <code>DoFn</code> at the global level, which makes it easier to unit |
| test and can make the <code>ParDo</code> code more readable.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// In this example, ExtractWordsFn is a DoFn that is defined as a static class: |
| </span></span></span><span class=line><span class=cl><span class=c1></span> |
| </span></span><span class=line><span class=cl><span class=kd>static</span> <span class=kd>class</span> <span class=nc>ExtractWordsFn</span> <span class=kd>extends</span> <span class=n>DoFn</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>></span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=nd>@ProcessElement</span> |
| </span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># In this example, the DoFns are defined as classes:</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=k>class</span> <span class=nc>FormatAsTextFn</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>DoFn</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=n>word</span><span class=p>,</span> <span class=n>count</span> <span class=o>=</span> <span class=n>element</span> |
| </span></span><span class=line><span class=cl> <span class=k>yield</span> <span class=s1>'</span><span class=si>%s</span><span class=s1>: </span><span class=si>%s</span><span class=s1>'</span> <span class=o>%</span> <span class=p>(</span><span class=n>word</span><span class=p>,</span> <span class=n>count</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=n>formatted</span> <span class=o>=</span> <span class=n>counts</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>FormatAsTextFn</span><span class=p>())</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=c1>// In this example, extractFn is a DoFn that is defined as a function: |
| </span></span></span><span class=line><span class=cl><span class=c1></span> |
| </span></span><span class=line><span class=cl><span class=kd>func</span> <span class=nf>extractFn</span><span class=p>(</span><span class=nx>ctx</span> <span class=nx>context</span><span class=p>.</span><span class=nx>Context</span><span class=p>,</span> <span class=nx>line</span> <span class=kt>string</span><span class=p>,</span> <span class=nx>emit</span> <span class=kd>func</span><span class=p>(</span><span class=kt>string</span><span class=p>))</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div><h3 id=creating-composite-transforms>Creating composite transforms</h3><p class="language-java language-py">If you have a processing operation that consists of multiple transforms or |
| <code>ParDo</code> steps, you can create it as a subclass of <code>PTransform</code>. Creating a |
| <code>PTransform</code> subclass allows you to encapsulate complex transforms, can make |
| your pipeline’s structure more clear and modular, and makes unit testing easier.</p><p class=language-go>If you have a processing operation that consists of multiple transforms or |
| <code>ParDo</code> steps, you can use a normal Go function to encapsulate them. You can |
| furthermore use a named subscope to group them as a composite transform visible |
| for monitoring.</p><p class="language-java language-py">In this example, two transforms are encapsulated as the <code>PTransform</code> subclass |
| <code>CountWords</code>. <code>CountWords</code> contains the <code>ParDo</code> that runs <code>ExtractWordsFn</code> and |
| the SDK-provided <code>Count</code> transform.</p><p class=language-go>In this example, two transforms are encapsulated as a <code>CountWords</code> function.</p><p>When <code>CountWords</code> is defined, we specify its ultimate input and output; the |
| input is the <code>PCollection<String></code> for the extraction operation, and the output |
| is the <code>PCollection<KV<String, Long>></code> produced by the count operation.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>CountWords</span> <span class=kd>extends</span> <span class=n>PTransform</span><span class=o><</span><span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>>></span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=nd>@Override</span> |
| </span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>></span> <span class=nf>expand</span><span class=o>(</span><span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>lines</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1>// Convert lines of text into individual words. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>words</span> <span class=o>=</span> <span class=n>lines</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>ExtractWordsFn</span><span class=o>()));</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1>// Count the number of times each word occurs. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>></span> <span class=n>wordCounts</span> <span class=o>=</span> |
| </span></span><span class=line><span class=cl> <span class=n>words</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Count</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>perElement</span><span class=o>());</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>wordCounts</span><span class=o>;</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>main</span><span class=o>(</span><span class=n>String</span><span class=o>[]</span> <span class=n>args</span><span class=o>)</span> <span class=kd>throws</span> <span class=n>IOException</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=n>Pipeline</span> <span class=n>p</span> <span class=o>=</span> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=n>p</span><span class=o>.</span><span class=na>apply</span><span class=o>(...)</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=k>new</span> <span class=n>CountWords</span><span class=o>())</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=nd>@beam.ptransform_fn</span> |
| </span></span><span class=line><span class=cl><span class=k>def</span> <span class=nf>CountWords</span><span class=p>(</span><span class=n>pcoll</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=k>return</span> <span class=p>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>pcoll</span> |
| </span></span><span class=line><span class=cl> <span class=c1># Convert lines of text into individual words.</span> |
| </span></span><span class=line><span class=cl> <span class=o>|</span> <span class=s1>'ExtractWords'</span> <span class=o>>></span> |
| </span></span><span class=line><span class=cl> <span class=n>beam</span><span class=o>.</span><span class=n>FlatMap</span><span class=p>(</span><span class=k>lambda</span> <span class=n>x</span><span class=p>:</span> <span class=n>re</span><span class=o>.</span><span class=n>findall</span><span class=p>(</span><span class=sa>r</span><span class=s1>'[A-Za-z</span><span class=se>\'</span><span class=s1>]+'</span><span class=p>,</span> <span class=n>x</span><span class=p>))</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1># Count the number of times each word occurs.</span> |
| </span></span><span class=line><span class=cl> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>combiners</span><span class=o>.</span><span class=n>Count</span><span class=o>.</span><span class=n>PerElement</span><span class=p>())</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=n>counts</span> <span class=o>=</span> <span class=n>lines</span> <span class=o>|</span> <span class=n>CountWords</span><span class=p>()</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=kd>func</span> <span class=nf>CountWords</span><span class=p>(</span><span class=nx>s</span> <span class=nx>beam</span><span class=p>.</span><span class=nx>Scope</span><span class=p>,</span> <span class=nx>lines</span> <span class=nx>beam</span><span class=p>.</span><span class=nx>PCollection</span><span class=p>)</span> <span class=nx>beam</span><span class=p>.</span><span class=nx>PCollection</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=nx>s</span> <span class=p>=</span> <span class=nx>s</span><span class=p>.</span><span class=nf>Scope</span><span class=p>(</span><span class=s>"CountWords"</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1>// Convert lines of text into individual words. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=nx>col</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>ParDo</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>extractFn</span><span class=p>,</span> <span class=nx>lines</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1>// Count the number of times each word occurs. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=k>return</span> <span class=nx>stats</span><span class=p>.</span><span class=nf>Count</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>col</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div><h3 id=using-parameterizable-pipelineoptions>Using parameterizable PipelineOptions</h3><p>You can hard-code various execution options when you run your pipeline. However, |
| the more common way is to define your own configuration options via command-line |
| argument parsing. Defining your configuration options via the command-line makes |
| the code more easily portable across different runners.</p><p class="language-java language-py">Add arguments to be processed by the command-line parser, and specify default |
| values for them. You can then access the options values in your pipeline code.</p><p class=language-go>You can use the standard <code>flag</code> package for this purpose.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kd>interface</span> <span class=nc>WordCountOptions</span> <span class=kd>extends</span> <span class=n>PipelineOptions</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=nd>@Description</span><span class=o>(</span><span class=s>"Path of the file to read from"</span><span class=o>)</span> |
| </span></span><span class=line><span class=cl> <span class=nd>@Default.String</span><span class=o>(</span><span class=s>"gs://dataflow-samples/shakespeare/kinglear.txt"</span><span class=o>)</span> |
| </span></span><span class=line><span class=cl> <span class=n>String</span> <span class=nf>getInputFile</span><span class=o>();</span> |
| </span></span><span class=line><span class=cl> <span class=kt>void</span> <span class=nf>setInputFile</span><span class=o>(</span><span class=n>String</span> <span class=n>value</span><span class=o>);</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>main</span><span class=o>(</span><span class=n>String</span><span class=o>[]</span> <span class=n>args</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=n>WordCountOptions</span> <span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptionsFactory</span><span class=o>.</span><span class=na>fromArgs</span><span class=o>(</span><span class=n>args</span><span class=o>).</span><span class=na>withValidation</span><span class=o>()</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>as</span><span class=o>(</span><span class=n>WordCountOptions</span><span class=o>.</span><span class=na>class</span><span class=o>);</span> |
| </span></span><span class=line><span class=cl> <span class=n>Pipeline</span> <span class=n>p</span> <span class=o>=</span> <span class=n>Pipeline</span><span class=o>.</span><span class=na>create</span><span class=o>(</span><span class=n>options</span><span class=o>);</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=kn>import</span> <span class=nn>argparse</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=n>parser</span> <span class=o>=</span> <span class=n>argparse</span><span class=o>.</span><span class=n>ArgumentParser</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl><span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span> |
| </span></span><span class=line><span class=cl> <span class=s1>'--input-file'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>default</span><span class=o>=</span><span class=s1>'gs://dataflow-samples/shakespeare/kinglear.txt'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>help</span><span class=o>=</span><span class=s1>'The file path for the input text to process.'</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl><span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span> |
| </span></span><span class=line><span class=cl> <span class=s1>'--output-path'</span><span class=p>,</span> <span class=n>required</span><span class=o>=</span><span class=kc>True</span><span class=p>,</span> <span class=n>help</span><span class=o>=</span><span class=s1>'The path prefix for output files.'</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl><span class=n>args</span><span class=p>,</span> <span class=n>beam_args</span> <span class=o>=</span> <span class=n>parser</span><span class=o>.</span><span class=n>parse_known_args</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=n>beam_options</span> <span class=o>=</span> <span class=n>PipelineOptions</span><span class=p>(</span><span class=n>beam_args</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl><span class=k>with</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=n>options</span><span class=o>=</span><span class=n>beam_options</span><span class=p>)</span> <span class=k>as</span> <span class=n>pipeline</span><span class=p>:</span> |
| </span></span><span class=line><span class=cl> <span class=n>lines</span> <span class=o>=</span> <span class=n>pipeline</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromText</span><span class=p>(</span><span class=n>args</span><span class=o>.</span><span class=n>input_file</span><span class=p>)</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=kd>var</span> <span class=nx>input</span> <span class=p>=</span> <span class=nx>flag</span><span class=p>.</span><span class=nf>String</span><span class=p>(</span><span class=s>"input"</span><span class=p>,</span> <span class=s>"gs://apache-beam-samples/shakespeare/kinglear.txt"</span><span class=p>,</span> <span class=s>"File(s) to read."</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=kd>func</span> <span class=nf>main</span><span class=p>()</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=nx>p</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>NewPipeline</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> <span class=nx>s</span> <span class=o>:=</span> <span class=nx>p</span><span class=p>.</span><span class=nf>Root</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=nx>lines</span> <span class=o>:=</span> <span class=nx>textio</span><span class=p>.</span><span class=nf>Read</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=o>*</span><span class=nx>input</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span></span></span></code></pre></div></div></div><h3 id=try-the-full-example-in-playground-1>Try the full example in Playground</h3><div class=playground-wrapper><div class=playground-snippets><div class="language-java playground-snippet" data-sdk=java data-path=SDK_JAVA_WordCount></div><div class="language-py playground-snippet" data-sdk=python data-path=SDK_PYTHON_WordCount></div><div class="language-go playground-snippet" data-sdk=go data-path=SDK_GO_WordCount></div></div><div class="code-snippet code-snippet-playground" data-src="https://play.beam.apache.org/embedded?editable=1&examples=%5b%7b%22path%22%3a%22SDK_JAVA_WordCount%22%2c%22sdk%22%3a%22java%22%7d%2c%7b%22path%22%3a%22SDK_PYTHON_WordCount%22%2c%22sdk%22%3a%22python%22%7d%2c%7b%22path%22%3a%22SDK_GO_WordCount%22%2c%22sdk%22%3a%22go%22%7d%5d" data-width=100% data-height=700px></div></div><h2 id=debuggingwordcount-example>DebuggingWordCount example</h2><p>The DebuggingWordCount example demonstrates some best practices for |
| instrumenting your pipeline code.</p><p><strong>To run this example in Java:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \ |
| -Dexec.args="--output=counts" -Pdirect-runner</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \ |
| -Dexec.args="--runner=FlinkRunner --output=counts" -Pflink-runner</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \ |
| -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \ |
| --output=/tmp/counts" -Pflink-runner |
| |
| You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \ |
| -Dexec.args="--runner=SparkRunner --output=counts" -Pspark-runner</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \ |
| -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \ |
| --project=YOUR_PROJECT --region=GCE_REGION \ |
| --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \ |
| -Pdataflow-runner</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \ |
| -Dexec.args="--runner=SamzaRunner --output=counts" -Psamza-runner</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \ |
| --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \ |
| --runner=JetRunner --jetLocalMode=3 --output=counts</code></pre></div></div><p>To view the full code in Java, see |
| <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java>DebuggingWordCount</a>.</p><p><strong>To run this example in Python:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow># As part of the initial setup, install Google Cloud Platform specific extra components. |
| pip install apache-beam[gcp] |
| python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \ |
| --output gs://YOUR_GCS_BUCKET/counts \ |
| --runner DataflowRunner \ |
| --project YOUR_GCP_PROJECT \ |
| --temp_location gs://YOUR_GCS_BUCKET/tmp/</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Python SDK.</code></pre></div></div><p>To view the full code in Python, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py>wordcount_debugging.py</a>.</strong></p><p><strong>To run this example in Go:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount |
| $ debugging_wordcount --input <PATH_TO_INPUT_FILE> --output counts</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow>$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount |
| # As part of the initial setup, for non linux users - install package unix before run |
| $ go get -u golang.org/x/sys/unix |
| $ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \ |
| --output gs://<your-gcs-bucket>/counts \ |
| --runner dataflow \ |
| --project your-gcp-project \ |
| --region your-gcp-region \ |
| --temp_location gs://<your-gcs-bucket>/tmp/ \ |
| --staging_location gs://<your-gcs-bucket>/binaries/ \ |
| --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Go SDK.</code></pre></div></div><p>To view the full code in Go, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go>debugging_wordcount.go</a>.</strong></p><p><strong>New Concepts:</strong></p><ul><li>Logging</li><li>Testing your Pipeline via <code>PAssert</code></li></ul><p>The following sections explain these key concepts in detail, and break down the |
| pipeline code into smaller sections.</p><h3 id=logging>Logging</h3><p>Each runner may choose to handle logs in its own way.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// This example uses .trace and .debug: |
| </span></span></span><span class=line><span class=cl><span class=c1></span> |
| </span></span><span class=line><span class=cl><span class=kd>public</span> <span class=kd>class</span> <span class=nc>DebuggingWordCount</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>FilterTextFn</span> <span class=kd>extends</span> <span class=n>DoFn</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>,</span> <span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>></span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=nd>@ProcessElement</span> |
| </span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=k>if</span> <span class=o>(...)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=n>LOG</span><span class=o>.</span><span class=na>debug</span><span class=o>(</span><span class=s>"Matched: "</span> <span class=o>+</span> <span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>getKey</span><span class=o>());</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> <span class=k>else</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=n>LOG</span><span class=o>.</span><span class=na>trace</span><span class=o>(</span><span class=s>"Did not match: "</span> <span class=o>+</span> <span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>getKey</span><span class=o>());</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># [START example_wordcount_debugging_aggregators]</span> |
| </span></span><span class=line><span class=cl><span class=kn>import</span> <span class=nn>logging</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=k>class</span> <span class=nc>FilterTextFn</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>DoFn</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=s2>"""A DoFn that filters for a specific key based on a regular expression."""</span> |
| </span></span><span class=line><span class=cl> <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pattern</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>pattern</span> <span class=o>=</span> <span class=n>pattern</span> |
| </span></span><span class=line><span class=cl> <span class=c1># A custom metric can track values in your pipeline as it runs. Create</span> |
| </span></span><span class=line><span class=cl> <span class=c1># custom metrics matched_word and unmatched_words.</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>matched_words</span> <span class=o>=</span> <span class=n>Metrics</span><span class=o>.</span><span class=n>counter</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=vm>__class__</span><span class=p>,</span> <span class=s1>'matched_words'</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>umatched_words</span> <span class=o>=</span> <span class=n>Metrics</span><span class=o>.</span><span class=n>counter</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=vm>__class__</span><span class=p>,</span> <span class=s1>'umatched_words'</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=n>word</span><span class=p>,</span> <span class=n>_</span> <span class=o>=</span> <span class=n>element</span> |
| </span></span><span class=line><span class=cl> <span class=k>if</span> <span class=n>re</span><span class=o>.</span><span class=k>match</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=n>pattern</span><span class=p>,</span> <span class=n>word</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=c1># Log at INFO level each element we match. When executing this pipeline</span> |
| </span></span><span class=line><span class=cl> <span class=c1># using the Dataflow service, these log lines will appear in the Cloud</span> |
| </span></span><span class=line><span class=cl> <span class=c1># Logging UI.</span> |
| </span></span><span class=line><span class=cl> <span class=n>logging</span><span class=o>.</span><span class=n>info</span><span class=p>(</span><span class=s1>'Matched </span><span class=si>%s</span><span class=s1>'</span><span class=p>,</span> <span class=n>word</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1># Add 1 to the custom metric counter matched_words</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>matched_words</span><span class=o>.</span><span class=n>inc</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> <span class=k>yield</span> <span class=n>element</span> |
| </span></span><span class=line><span class=cl> <span class=k>else</span><span class=p>:</span> |
| </span></span><span class=line><span class=cl> <span class=c1># Log at the "DEBUG" level each element that is not matched. Different</span> |
| </span></span><span class=line><span class=cl> <span class=c1># log levels can be used to control the verbosity of logging providing</span> |
| </span></span><span class=line><span class=cl> <span class=c1># an effective mechanism to filter less important information. Note</span> |
| </span></span><span class=line><span class=cl> <span class=c1># currently only "INFO" and higher level logs are emitted to the Cloud</span> |
| </span></span><span class=line><span class=cl> <span class=c1># Logger. This log message will not be visible in the Cloud Logger.</span> |
| </span></span><span class=line><span class=cl> <span class=n>logging</span><span class=o>.</span><span class=n>debug</span><span class=p>(</span><span class=s1>'Did not match </span><span class=si>%s</span><span class=s1>'</span><span class=p>,</span> <span class=n>word</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=c1># Add 1 to the custom metric counter umatched_words</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>umatched_words</span><span class=o>.</span><span class=n>inc</span><span class=p>()</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=kd>type</span> <span class=nx>filterFn</span> <span class=kd>struct</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=kd>func</span> <span class=p>(</span><span class=nx>f</span> <span class=o>*</span><span class=nx>filterFn</span><span class=p>)</span> <span class=nf>ProcessElement</span><span class=p>(</span><span class=nx>ctx</span> <span class=nx>context</span><span class=p>.</span><span class=nx>Context</span><span class=p>,</span> <span class=nx>word</span> <span class=kt>string</span><span class=p>,</span> <span class=nx>count</span> <span class=kt>int</span><span class=p>,</span> <span class=nx>emit</span> <span class=kd>func</span><span class=p>(</span><span class=kt>string</span><span class=p>,</span> <span class=kt>int</span><span class=p>))</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=k>if</span> <span class=nx>f</span><span class=p>.</span><span class=nx>re</span><span class=p>.</span><span class=nf>MatchString</span><span class=p>(</span><span class=nx>word</span><span class=p>)</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=c1>// Log at the "INFO" level each element that we match. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=nx>log</span><span class=p>.</span><span class=nf>Infof</span><span class=p>(</span><span class=nx>ctx</span><span class=p>,</span> <span class=s>"Matched: %v"</span><span class=p>,</span> <span class=nx>word</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=nf>emit</span><span class=p>(</span><span class=nx>word</span><span class=p>,</span> <span class=nx>count</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=p>}</span> <span class=k>else</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=c1>// Log at the "DEBUG" level each element that is not matched. |
| </span></span></span><span class=line><span class=cl><span class=c1></span> <span class=nx>log</span><span class=p>.</span><span class=nf>Debugf</span><span class=p>(</span><span class=nx>ctx</span><span class=p>,</span> <span class=s>"Did not match: %v"</span><span class=p>,</span> <span class=nx>word</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=p>}</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div><h4 id=direct-runner>Direct Runner</h4><p>When executing your pipeline with the <code>DirectRunner</code>, you can print log |
| messages directly to your local console. <span class=language-java>If you use |
| the Beam SDK for Java, you must add <code>Slf4j</code> to your class path.</span></p><h4 id=cloud-dataflow-runner>Cloud Dataflow Runner</h4><p>When executing your pipeline with the <code>DataflowRunner</code>, you can use Stackdriver |
| Logging. Stackdriver Logging aggregates the logs from all of your Cloud Dataflow |
| job’s workers to a single location in the Google Cloud Platform Console. You can |
| use Stackdriver Logging to search and access the logs from all of the workers |
| that Cloud Dataflow has spun up to complete your job. Logging statements in your |
| pipeline’s <code>DoFn</code> instances will appear in Stackdriver Logging as your pipeline |
| runs.</p><p class="language-java language-py">You can also control the worker log levels. Cloud Dataflow workers that execute |
| user code are configured to log to Stackdriver Logging by default at “INFO” log |
| level and higher. You can override log levels for specific logging namespaces by |
| specifying: <code>--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>. |
| For example, by specifying <code>--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code> |
| when executing a pipeline using the Cloud Dataflow service, Stackdriver Logging |
| will contain only “DEBUG” or higher level logs for the package in addition to |
| the default “INFO” or higher level logs.</p><p class="language-java language-py">The default Cloud Dataflow worker logging configuration can be overridden by |
| specifying <code>--defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR></code>. |
| For example, by specifying <code>--defaultWorkerLogLevel=DEBUG</code> when executing a |
| pipeline with the Cloud Dataflow service, Cloud Logging will contain all “DEBUG” |
| or higher level logs. Note that changing the default worker log level to TRACE |
| or DEBUG significantly increases the amount of logs output.</p><h4 id=apache-spark-runner>Apache Spark Runner</h4><blockquote><p><strong>Note:</strong> This section is yet to be added. There is an open issue for this |
| (<a href=https://github.com/apache/beam/issues/18076>Issue 18076</a>).</p></blockquote><h4 id=apache-flink-runner>Apache Flink Runner</h4><blockquote><p><strong>Note:</strong> This section is yet to be added. There is an open issue for this |
| (<a href=https://github.com/apache/beam/issues/18075>Issue 18075</a>).</p></blockquote><h4 id=apache-nemo-runner>Apache Nemo Runner</h4><p>When executing your pipeline with the <code>NemoRunner</code>, most log messages are printed |
| directly to your local console. You should add <code>Slf4j</code> to your class path to make |
| full use of the logs. In order to observe the logs on each of the driver and the |
| executor sides, you should observe the folders created by Apache REEF. For example, |
| when running your pipeline through the local runtime, a folder called <code>REEF_LOCAL_RUNTIME</code> |
| will be created on your work directory, and the logs and the metric information can |
| all be found under the directory.</p><h3 id=testing-your-pipeline-with-asserts>Testing your pipeline with asserts</h3><p class="language-java language-py"><span class=language-java><code>PAssert</code></span><span class=language-py><code>assert_that</code></span> |
| is a set of convenient PTransforms in the style of Hamcrest’s collection |
| matchers that can be used when writing pipeline level tests to validate the |
| contents of PCollections. Asserts are best used in unit tests with small datasets.</p><p class=language-go>The <code>passert</code> package contains convenient PTransforms that can be used when |
| writing pipeline level tests to validate the contents of PCollections. Asserts |
| are best used in unit tests with small datasets.</p><p class=language-java>The following example verifies that the set of filtered words matches our |
| expected counts. The assert does not produce any output, and the pipeline only |
| succeeds if all of the expectations are met.</p><p class="language-py language-go">The following example verifies that two collections contain the same values. The |
| assert does not produce any output, and the pipeline only succeeds if all of the |
| expectations are met.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>main</span><span class=o>(</span><span class=n>String</span><span class=o>[]</span> <span class=n>args</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=n>List</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>></span> <span class=n>expectedResults</span> <span class=o>=</span> <span class=n>Arrays</span><span class=o>.</span><span class=na>asList</span><span class=o>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>KV</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=s>"Flourish"</span><span class=o>,</span> <span class=n>3L</span><span class=o>),</span> |
| </span></span><span class=line><span class=cl> <span class=n>KV</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=s>"stomach"</span><span class=o>,</span> <span class=n>1L</span><span class=o>));</span> |
| </span></span><span class=line><span class=cl> <span class=n>PAssert</span><span class=o>.</span><span class=na>that</span><span class=o>(</span><span class=n>filteredWords</span><span class=o>).</span><span class=na>containsInAnyOrder</span><span class=o>(</span><span class=n>expectedResults</span><span class=o>);</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=kn>from</span> <span class=nn>apache_beam.testing.util</span> <span class=kn>import</span> <span class=n>assert_that</span> |
| </span></span><span class=line><span class=cl><span class=kn>from</span> <span class=nn>apache_beam.testing.util</span> <span class=kn>import</span> <span class=n>equal_to</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=k>with</span> <span class=n>TestPipeline</span><span class=p>()</span> <span class=k>as</span> <span class=n>p</span><span class=p>:</span> |
| </span></span><span class=line><span class=cl> <span class=n>assert_that</span><span class=p>(</span><span class=n>p</span> <span class=o>|</span> <span class=n>Create</span><span class=p>([</span><span class=mi>1</span><span class=p>,</span> <span class=mi>2</span><span class=p>,</span> <span class=mi>3</span><span class=p>]),</span> <span class=n>equal_to</span><span class=p>([</span><span class=mi>1</span><span class=p>,</span> <span class=mi>2</span><span class=p>,</span> <span class=mi>3</span><span class=p>]))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=nx>passert</span><span class=p>.</span><span class=nf>Equals</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>formatted</span><span class=p>,</span> <span class=s>"Flourish: 3"</span><span class=p>,</span> <span class=s>"stomach: 1"</span><span class=p>)</span></span></span></code></pre></div></div></div><p class=language-java>See <a href=https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java>DebuggingWordCountTest</a> |
| for an example unit test.</p><h3 id=try-the-full-example-in-playground-2>Try the full example in Playground</h3><div class=playground-wrapper><div class=playground-snippets><div class="language-java playground-snippet" data-sdk=java data-path=SDK_JAVA_DebuggingWordCount></div><div class="language-py playground-snippet" data-sdk=python data-path=SDK_PYTHON_WordCountDebugging></div><div class="language-go playground-snippet" data-sdk=go data-path=SDK_GO_DebuggingWordCount></div></div><div class="code-snippet code-snippet-playground" data-src="https://play.beam.apache.org/embedded?editable=1&examples=%5b%7b%22path%22%3a%22SDK_JAVA_DebuggingWordCount%22%2c%22sdk%22%3a%22java%22%7d%2c%7b%22path%22%3a%22SDK_PYTHON_WordCountDebugging%22%2c%22sdk%22%3a%22python%22%7d%2c%7b%22path%22%3a%22SDK_GO_DebuggingWordCount%22%2c%22sdk%22%3a%22go%22%7d%5d" data-width=100% data-height=700px></div></div><h2 id=windowedwordcount-example>WindowedWordCount example</h2><p>The WindowedWordCount example counts words in text just as the previous |
| examples did, but introduces several advanced concepts.</p><p><strong>New Concepts:</strong></p><ul><li>Unbounded and bounded datasets</li><li>Adding timestamps to data</li><li>Windowing</li><li>Reusing PTransforms over windowed PCollections</li></ul><p>The following sections explain these key concepts in detail, and break down the |
| pipeline code into smaller sections.</p><p><strong>To run this example in Java:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \ |
| -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \ |
| -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \ |
| -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \ |
| --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner |
| |
| You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \ |
| -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \ |
| -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \ |
| --project=YOUR_PROJECT --region=GCE_REGION \ |
| --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \ |
| -Pdataflow-runner</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \ |
| -Dexec.args="--runner=SamzaRunner --inputFile=pom.xml --output=counts" -Psamza-runner</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \ |
| --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \ |
| --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts</code></pre></div></div><p>To view the full code in Java, see |
| <strong><a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java>WindowedWordCount</a>.</strong></p><p><strong>To run this example in Python:</strong></p><p>This pipeline writes its results to a BigQuery table <code>--output_table</code> |
| parameter. using the format <code>PROJECT:DATASET.TABLE</code> or |
| <code>DATASET.TABLE</code>.</p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow># As part of the initial setup, install Google Cloud Platform specific extra components. |
| pip install apache-beam[gcp] |
| python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \ |
| --output_table PROJECT:DATASET.TABLE \ |
| --runner DataflowRunner \ |
| --project YOUR_GCP_PROJECT \ |
| --temp_location gs://YOUR_GCS_BUCKET/tmp/</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Python SDK.</code></pre></div></div><p>To view the full code in Python, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/windowed_wordcount.py>windowed_wordcount.py</a>.</strong></p><p><strong>To run this example in Go:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount |
| $ windowed_wordcount --input <PATH_TO_INPUT_FILE> --output counts</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow>$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount |
| # As part of the initial setup, for non linux users - install package unix before run |
| $ go get -u golang.org/x/sys/unix |
| $ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \ |
| --output gs://<your-gcs-bucket>/counts \ |
| --runner dataflow \ |
| --project your-gcp-project \ |
| --temp_location gs://<your-gcs-bucket>/tmp/ \ |
| --staging_location gs://<your-gcs-bucket>/binaries/ \ |
| --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Go SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Go SDK.</code></pre></div></div><p>To view the full code in Go, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/go/examples/windowed_wordcount/windowed_wordcount.go>windowed_wordcount.go</a>.</strong></p><h3 id=unbounded-and-bounded-datasets>Unbounded and bounded datasets</h3><p>Beam allows you to create a single pipeline that can handle both bounded and |
| unbounded datasets. If your dataset has a fixed number of elements, it is a bounded |
| dataset and all of the data can be processed together. For bounded datasets, |
| the question to ask is “Do I have all of the data?” If data continuously |
| arrives (such as an endless stream of game scores in the |
| <a href=/get-started/mobile-gaming-example/>Mobile gaming example</a>, |
| it is an unbounded dataset. An unbounded dataset is never available for |
| processing at any one time, so the data must be processed using a streaming |
| pipeline that runs continuously. The dataset will only be complete up to a |
| certain point, so the question to ask is “Up until what point do I have all of |
| the data?” Beam uses <a href=/documentation/programming-guide/#windowing>windowing</a> |
| to divide a continuously updating dataset into logical windows of finite size. |
| If your input is unbounded, you must use a runner that supports streaming.</p><p>If your pipeline’s input is bounded, then all downstream PCollections will also be |
| bounded. Similarly, if the input is unbounded, then all downstream PCollections |
| of the pipeline will be unbounded, though separate branches may be independently |
| bounded.</p><p>Recall that the input for this example is a set of Shakespeare’s texts, which is |
| a finite set of data. Therefore, this example reads bounded data from a text |
| file:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>main</span><span class=o>(</span><span class=n>String</span><span class=o>[]</span> <span class=n>args</span><span class=o>)</span> <span class=kd>throws</span> <span class=n>IOException</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=n>Options</span> <span class=n>options</span> <span class=o>=</span> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=n>Pipeline</span> <span class=n>pipeline</span> <span class=o>=</span> <span class=n>Pipeline</span><span class=o>.</span><span class=na>create</span><span class=o>(</span><span class=n>options</span><span class=o>);</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>input</span> <span class=o>=</span> <span class=n>pipeline</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>TextIO</span><span class=o>.</span><span class=na>read</span><span class=o>().</span><span class=na>from</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getInputFile</span><span class=o>()))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>def</span> <span class=nf>main</span><span class=p>(</span><span class=n>arvg</span><span class=o>=</span><span class=kc>None</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=n>parser</span> <span class=o>=</span> <span class=n>argparse</span><span class=o>.</span><span class=n>ArgumentParser</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span><span class=s1>'--input-file'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>dest</span><span class=o>=</span><span class=s1>'input_file'</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>default</span><span class=o>=</span><span class=s1>'/Users/home/words-example.txt'</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=n>known_args</span><span class=p>,</span> <span class=n>pipeline_args</span> <span class=o>=</span> <span class=n>parser</span><span class=o>.</span><span class=n>parse_known_args</span><span class=p>(</span><span class=n>argv</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=n>pipeline_options</span> <span class=o>=</span> <span class=n>PipelineOptions</span><span class=p>(</span><span class=n>pipeline_args</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=n>p</span> <span class=o>=</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=n>options</span><span class=o>=</span><span class=n>pipeline_options</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=n>lines</span> <span class=o>=</span> <span class=n>p</span> <span class=o>|</span> <span class=s1>'read'</span> <span class=o>>></span> <span class=n>ReadFromText</span><span class=p>(</span><span class=n>known_args</span><span class=o>.</span><span class=n>input_file</span><span class=p>)</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=kd>func</span> <span class=nf>main</span><span class=p>()</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl> <span class=nx>p</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>NewPipeline</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> <span class=nx>s</span> <span class=o>:=</span> <span class=nx>p</span><span class=p>.</span><span class=nf>Root</span><span class=p>()</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=nx>lines</span> <span class=o>:=</span> <span class=nx>textio</span><span class=p>.</span><span class=nf>Read</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=o>*</span><span class=nx>input</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=o>...</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div><h3 id=adding-timestamps-to-data>Adding timestamps to data</h3><p>Each element in a <code>PCollection</code> has an associated <a href=/documentation/programming-guide#element-timestamps>timestamp</a>. |
| The timestamp for each element is initially assigned by the source that creates |
| the <code>PCollection</code>. Some sources that create unbounded PCollections can assign |
| each new element a timestamp that corresponds to when the element was read or |
| added. You can manually assign or adjust timestamps with a <code>DoFn</code>; however, you |
| can only move timestamps forward in time.</p><p>In this example the input is bounded. For the purpose of the example, the <code>DoFn</code> |
| method named <code>AddTimestampsFn</code> (invoked by <code>ParDo</code>) will set a timestamp for |
| each element in the <code>PCollection</code>.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>AddTimestampFn</span><span class=o>(</span><span class=n>minTimestamp</span><span class=o>,</span> <span class=n>maxTimestamp</span><span class=o>)));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=n>AddTimestampFn</span><span class=p>(</span><span class=n>min_timestamp</span><span class=p>,</span> <span class=n>max_timestamp</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>timestampedLines</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>ParDo</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=o>&</span><span class=nx>addTimestampFn</span><span class=p>{</span><span class=nx>Min</span><span class=p>:</span> <span class=nx>mtime</span><span class=p>.</span><span class=nf>Now</span><span class=p>()},</span> <span class=nx>lines</span><span class=p>)</span></span></span></code></pre></div></div></div><p>Below is the code for <code>AddTimestampFn</code>, a <code>DoFn</code> invoked by <code>ParDo</code>, that sets |
| the data element of the timestamp given the element itself. For example, if the |
| elements were log lines, this <code>ParDo</code> could parse the time out of the log string |
| and set it as the element’s timestamp. There are no timestamps inherent in the |
| works of Shakespeare, so in this case we’ve made up random timestamps just to |
| illustrate the concept. Each line of the input text will get a random associated |
| timestamp sometime in a 2-hour period.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>static</span> <span class=kd>class</span> <span class=nc>AddTimestampFn</span> <span class=kd>extends</span> <span class=n>DoFn</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>></span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=kd>private</span> <span class=kd>final</span> <span class=n>Instant</span> <span class=n>minTimestamp</span><span class=o>;</span> |
| </span></span><span class=line><span class=cl> <span class=kd>private</span> <span class=kd>final</span> <span class=n>Instant</span> <span class=n>maxTimestamp</span><span class=o>;</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=n>AddTimestampFn</span><span class=o>(</span><span class=n>Instant</span> <span class=n>minTimestamp</span><span class=o>,</span> <span class=n>Instant</span> <span class=n>maxTimestamp</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=k>this</span><span class=o>.</span><span class=na>minTimestamp</span> <span class=o>=</span> <span class=n>minTimestamp</span><span class=o>;</span> |
| </span></span><span class=line><span class=cl> <span class=k>this</span><span class=o>.</span><span class=na>maxTimestamp</span> <span class=o>=</span> <span class=n>maxTimestamp</span><span class=o>;</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=nd>@ProcessElement</span> |
| </span></span><span class=line><span class=cl> <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span> |
| </span></span><span class=line><span class=cl> <span class=n>Instant</span> <span class=n>randomTimestamp</span> <span class=o>=</span> |
| </span></span><span class=line><span class=cl> <span class=k>new</span> <span class=n>Instant</span><span class=o>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>ThreadLocalRandom</span><span class=o>.</span><span class=na>current</span><span class=o>()</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>nextLong</span><span class=o>(</span><span class=n>minTimestamp</span><span class=o>.</span><span class=na>getMillis</span><span class=o>(),</span> <span class=n>maxTimestamp</span><span class=o>.</span><span class=na>getMillis</span><span class=o>()));</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=cm>/** |
| </span></span></span><span class=line><span class=cl><span class=cm> * Concept #2: Set the data element with that timestamp. |
| </span></span></span><span class=line><span class=cl><span class=cm> */</span> |
| </span></span><span class=line><span class=cl> <span class=n>c</span><span class=o>.</span><span class=na>outputWithTimestamp</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>(),</span> <span class=k>new</span> <span class=n>Instant</span><span class=o>(</span><span class=n>randomTimestamp</span><span class=o>));</span> |
| </span></span><span class=line><span class=cl> <span class=o>}</span> |
| </span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>class</span> <span class=nc>AddTimestampFn</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>DoFn</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>min_timestamp</span><span class=p>,</span> <span class=n>max_timestamp</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>min_timestamp</span> <span class=o>=</span> <span class=n>min_timestamp</span> |
| </span></span><span class=line><span class=cl> <span class=bp>self</span><span class=o>.</span><span class=n>max_timestamp</span> <span class=o>=</span> <span class=n>max_timestamp</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl> <span class=k>def</span> <span class=nf>process</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>element</span><span class=p>):</span> |
| </span></span><span class=line><span class=cl> <span class=k>return</span> <span class=n>window</span><span class=o>.</span><span class=n>TimestampedValue</span><span class=p>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>element</span><span class=p>,</span> |
| </span></span><span class=line><span class=cl> <span class=n>random</span><span class=o>.</span><span class=n>randint</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=n>min_timestamp</span><span class=p>,</span> <span class=bp>self</span><span class=o>.</span><span class=n>max_timestamp</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=kd>type</span> <span class=nx>addTimestampFn</span> <span class=kd>struct</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=nx>Min</span> <span class=nx>beam</span><span class=p>.</span><span class=nx>EventTime</span> <span class=s>`json:"min"`</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span> |
| </span></span><span class=line><span class=cl> |
| </span></span><span class=line><span class=cl><span class=kd>func</span> <span class=p>(</span><span class=nx>f</span> <span class=o>*</span><span class=nx>addTimestampFn</span><span class=p>)</span> <span class=nf>ProcessElement</span><span class=p>(</span><span class=nx>x</span> <span class=nx>beam</span><span class=p>.</span><span class=nx>X</span><span class=p>)</span> <span class=p>(</span><span class=nx>beam</span><span class=p>.</span><span class=nx>EventTime</span><span class=p>,</span> <span class=nx>beam</span><span class=p>.</span><span class=nx>X</span><span class=p>)</span> <span class=p>{</span> |
| </span></span><span class=line><span class=cl> <span class=nx>timestamp</span> <span class=o>:=</span> <span class=nx>f</span><span class=p>.</span><span class=nx>Min</span><span class=p>.</span><span class=nf>Add</span><span class=p>(</span><span class=nx>time</span><span class=p>.</span><span class=nf>Duration</span><span class=p>(</span><span class=nx>rand</span><span class=p>.</span><span class=nf>Int63n</span><span class=p>(</span><span class=mi>2</span> <span class=o>*</span> <span class=nx>time</span><span class=p>.</span><span class=nx>Hour</span><span class=p>.</span><span class=nf>Nanoseconds</span><span class=p>())))</span> |
| </span></span><span class=line><span class=cl> <span class=k>return</span> <span class=nx>timestamp</span><span class=p>,</span> <span class=nx>x</span> |
| </span></span><span class=line><span class=cl><span class=p>}</span></span></span></code></pre></div></div></div><p class=language-go>Note that the use of the <code>beam.X</code> “type variable” allows the transform to be |
| used for any type.</p><h3 id=windowing>Windowing</h3><p>Beam uses a concept called <strong>Windowing</strong> to subdivide a <code>PCollection</code> into |
| bounded sets of elements. PTransforms that aggregate multiple elements process |
| each <code>PCollection</code> as a succession of multiple, finite windows, even though the |
| entire collection itself may be of infinite size (unbounded).</p><p>The WindowedWordCount example applies fixed-time windowing, wherein each |
| window represents a fixed time interval. The fixed window size for this example |
| defaults to 1 minute (you can change this with a command-line option).</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>windowedWords</span> <span class=o>=</span> <span class=n>input</span> |
| </span></span><span class=line><span class=cl> <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Window</span><span class=o>.<</span><span class=n>String</span><span class=o>></span><span class=n>into</span><span class=o>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getWindowSize</span><span class=o>()))));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>windowed_words</span> <span class=o>=</span> <span class=nb>input</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=mi>60</span> <span class=o>*</span> <span class=n>window_size_minutes</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>windowedLines</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>WindowInto</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>window</span><span class=p>.</span><span class=nf>NewFixedWindows</span><span class=p>(</span><span class=nx>time</span><span class=p>.</span><span class=nx>Minute</span><span class=p>),</span> <span class=nx>timestampedLines</span><span class=p>)</span></span></span></code></pre></div></div></div><h3 id=reusing-ptransforms-over-windowed-pcollections>Reusing PTransforms over windowed PCollections</h3><p>You can reuse existing PTransforms that were created for manipulating simple |
| PCollections over windowed PCollections as well.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Long</span><span class=o>>></span> <span class=n>wordCounts</span> <span class=o>=</span> <span class=n>windowedWords</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=k>new</span> <span class=n>WordCount</span><span class=o>.</span><span class=na>CountWords</span><span class=o>());</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=n>word_counts</span> <span class=o>=</span> <span class=n>windowed_words</span> <span class=o>|</span> <span class=n>CountWords</span><span class=p>()</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl><span class=nx>counted</span> <span class=o>:=</span> <span class=nx>wordcount</span><span class=p>.</span><span class=nf>CountWords</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>windowedLines</span><span class=p>)</span></span></span></code></pre></div></div></div><p class="language-java language-go"><h3 id=try-the-full-example-in-playground>Try the full example in Playground</h3></p><div class=playground-wrapper><div class=playground-snippets><div class="language-java playground-snippet" data-sdk=java data-path=SDK_JAVA_WindowedWordCount></div><div class="language-go playground-snippet" data-sdk=go data-path=SDK_GO_WindowedWordCount></div></div><div class="code-snippet code-snippet-playground" data-src="https://play.beam.apache.org/embedded?editable=1&examples=%5b%7b%22path%22%3a%22SDK_JAVA_WindowedWordCount%22%2c%22sdk%22%3a%22java%22%7d%2c%7b%22path%22%3a%22SDK_GO_WindowedWordCount%22%2c%22sdk%22%3a%22go%22%7d%5d" data-width=100% data-height=700px></div></div><h2 id=streamingwordcount-example>StreamingWordCount example</h2><p>The StreamingWordCount example is a streaming pipeline that reads Pub/Sub |
| messages from a Pub/Sub subscription or topic, and performs a frequency count on |
| the words in each message. Similar to WindowedWordCount, this example applies |
| fixed-time windowing, wherein each window represents a fixed time interval. The |
| fixed window size for this example is 15 seconds. The pipeline outputs the |
| frequency count of the words seen in each 15 second window.</p><p><strong>New Concepts:</strong></p><ul><li>Reading an unbounded dataset</li><li>Writing unbounded results</li></ul><p><strong>To run this example in Java:</strong></p><blockquote><p><strong>Note:</strong> StreamingWordCount is not yet available for the Java SDK.</p></blockquote><p><strong>To run this example in Python:</strong></p><div class='runner-direct snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-direct data-lang=direct>python -m apache_beam.examples.streaming_wordcount \ |
| --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \ |
| --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \ |
| --streaming</code></pre></div></div><div class='runner-flink snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flink data-lang=flink>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-flinkCluster snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-flinkCluster data-lang=flinkCluster>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-spark snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-spark data-lang=spark>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-dataflow snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-dataflow data-lang=dataflow># As part of the initial setup, install Google Cloud Platform specific extra components. |
| pip install apache-beam[gcp] |
| python -m apache_beam.examples.streaming_wordcount \ |
| --runner DataflowRunner \ |
| --project YOUR_GCP_PROJECT \ |
| --region YOUR_GCP_REGION \ |
| --temp_location gs://YOUR_GCS_BUCKET/tmp/ \ |
| --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \ |
| --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \ |
| --streaming</code></pre></div></div><div class='runner-samza snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-samza data-lang=samza>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-nemo snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-nemo data-lang=nemo>This runner is not yet available for the Python SDK.</code></pre></div></div><div class='runner-jet snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code class=language-jet data-lang=jet>This runner is not yet available for the Python SDK.</code></pre></div></div><p>To view the full code in Python, see |
| <strong><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py>streaming_wordcount.py</a>.</strong></p><p><strong>To run this example in Go:</strong></p><blockquote><p><strong>Note:</strong> StreamingWordCount is not yet available for the Go SDK. There is an open issue for this |
| (<a href=https://github.com/apache/beam/issues/18879>Issue 18879</a>).</p></blockquote><h3 id=reading-an-unbounded-dataset>Reading an unbounded dataset</h3><p>This example uses an unbounded dataset as input. The code reads Pub/Sub |
| messages from a Pub/Sub subscription or topic using |
| <a href=https://beam.apache.org/releases/pydoc/2.55.1/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub><code>beam.io.ReadFromPubSub</code></a>.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl> <span class=c1>// This example is not currently available for the Beam SDK for Java. |
| </span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># Read from Pub/Sub into a PCollection.</span> |
| </span></span><span class=line><span class=cl> <span class=k>if</span> <span class=n>known_args</span><span class=o>.</span><span class=n>input_subscription</span><span class=p>:</span> |
| </span></span><span class=line><span class=cl> <span class=n>data</span> <span class=o>=</span> <span class=n>p</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromPubSub</span><span class=p>(</span> |
| </span></span><span class=line><span class=cl> <span class=n>subscription</span><span class=o>=</span><span class=n>known_args</span><span class=o>.</span><span class=n>input_subscription</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=k>else</span><span class=p>:</span> |
| </span></span><span class=line><span class=cl> <span class=n>data</span> <span class=o>=</span> <span class=n>p</span> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromPubSub</span><span class=p>(</span><span class=n>topic</span><span class=o>=</span><span class=n>known_args</span><span class=o>.</span><span class=n>input_topic</span><span class=p>)</span> |
| </span></span><span class=line><span class=cl> <span class=n>lines</span> <span class=o>=</span> <span class=n>data</span> <span class=o>|</span> <span class=s1>'DecodeString'</span> <span class=o>>></span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>d</span><span class=p>:</span> <span class=n>d</span><span class=o>.</span><span class=n>decode</span><span class=p>(</span><span class=s1>'utf-8'</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl> <span class=c1>// This example is not currently available for the Beam SDK for Go. |
| </span></span></span></code></pre></div></div></div><h3 id=writing-unbounded-results>Writing unbounded results</h3><p>When the input is unbounded, the same is true of the output <code>PCollection</code>. As |
| such, you must make sure to choose an appropriate I/O for the results. Some I/Os |
| support only bounded output, while others support both bounded and unbounded |
| outputs.</p><p>This example uses an unbounded <code>PCollection</code> and streams the results to |
| Google Pub/Sub. The code formats the results and writes them to a Pub/Sub topic |
| using <a href=https://beam.apache.org/releases/pydoc/2.55.1/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.WriteToPubSub><code>beam.io.WriteToPubSub</code></a>.</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl> <span class=c1>// This example is not currently available for the Beam SDK for Java. |
| </span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl> <span class=c1># Write to Pub/Sub</span> |
| </span></span><span class=line><span class=cl> <span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span class=n>output</span> |
| </span></span><span class=line><span class=cl> <span class=o>|</span> <span class=s1>'EncodeString'</span> <span class=o>>></span> <span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>s</span><span class=p>:</span> <span class=n>s</span><span class=o>.</span><span class=n>encode</span><span class=p>(</span><span class=s1>'utf-8'</span><span class=p>))</span> |
| </span></span><span class=line><span class=cl> <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToPubSub</span><span class=p>(</span><span class=n>known_args</span><span class=o>.</span><span class=n>output_topic</span><span class=p>))</span></span></span></code></pre></div></div></div><div class='language-go snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-go data-lang=go><span class=line><span class=cl> <span class=c1>// This example is not currently available for the Beam SDK for Go. |
| </span></span></span></code></pre></div></div></div><h2 id=next-steps>Next Steps</h2><ul><li>Walk through the Mobile Gaming examples in the <a href=/get-started/mobile-gaming-example>Mobile Gaming Example Walkthrough</a>.</li><li>Take a self-paced tour through our <a href=/documentation/resources/learning-resources>Learning Resources</a>.</li><li>Dive in to some of our favorite <a href=/get-started/resources/videos-and-podcasts>Videos and Podcasts</a>.</li><li>Join the Beam <a href=/community/contact-us>users@</a> mailing list.</li></ul><p>Please don’t hesitate to <a href=/community/contact-us>reach out</a> if you encounter any issues!</p><div class=feedback><p class=update>Last updated on 2024/04/25</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>© |
| <a href=https://www.apache.org>The Apache Software Foundation</a> |
| | <a href=/privacy_policy>Privacy Policy</a> |
| | <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html> |