blob: a80374aeef8ff83ae36c81b27de4b1afeea2b3f5 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Online Clustering</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/ml/online-clustering/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/ml/online-clustering.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/ml/online-clustering.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#dataset-for-clustering>Dataset for Clustering</a></li><li><a href=#clustering-algorithm>Clustering Algorithm</a></li><li><a href=#ingestion-to-pubsub>Ingestion to Pub/Sub</a></li><li><a href=#run-the-pipeline>Run the Pipeline</a></li><li><a href=#clustering-on-streaming-data>Clustering on Streaming Data</a><ul><li><a href=#run-the-pipeline-1>Run the Pipeline</a></li><li><a href=#get-embedding-from-a-language-model>Get Embedding from a Language Model</a></li><li><a href=#statefulonlineclustering>StatefulOnlineClustering</a></li></ul></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=online-clustering-example>Online Clustering Example</h1><p>The online clustering example demonstrates how to set up a real-time clustering pipeline that can read text from Pub/Sub, convert the text into an embedding using a language model, and cluster the text using BIRCH.</p><h2 id=dataset-for-clustering>Dataset for Clustering</h2><p>This example uses a dataset called <a href=https://huggingface.co/datasets/emotion>emotion</a> that contains 20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train, validation, and test. Because it contains the text and the category (class) of the dataset, it&rsquo;s a supervised dataset. To access this dataset, use the <a href=https://huggingface.co/docs/datasets/index>Hugging Face datasets page</a>.</p><p>The following text shows examples from the train split of the dataset:</p><table><thead><tr><th style=text-align:left>Text</th><th style=text-align:center>Type of emotion</th></tr></thead><tbody><tr><td style=text-align:left>im grabbing a minute to post i feel greedy wrong</td><td style=text-align:center>Anger</td></tr><tr><td style=text-align:left>i am ever feeling nostalgic about the fireplace i will know that it is still on the property</td><td style=text-align:center>Love</td></tr><tr><td style=text-align:left>ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny</td><td style=text-align:center>Fear</td></tr><tr><td style=text-align:left>on a boat trip to denmark</td><td style=text-align:center>Joy</td></tr><tr><td style=text-align:left>i feel you know basically like a fake in the realm of science fiction</td><td style=text-align:center>Sadness</td></tr><tr><td style=text-align:left>i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations</td><td style=text-align:center>Fear</td></tr></tbody></table><h2 id=clustering-algorithm>Clustering Algorithm</h2><p>For the clustering of tweets, we use an incremental clustering algorithm called BIRCH. It stands for balanced iterative reducing and clustering using hierarchies, and it is an unsupervised data mining algorithm used to perform hierarchical clustering over particularly large datasets. An advantage of BIRCH is its ability to incrementally and dynamically cluster incoming, multi-dimensional metric data points in an attempt to produce the best quality clustering for a given set of resources (memory and time constraints).</p><h2 id=ingestion-to-pubsub>Ingestion to Pub/Sub</h2><p>The example starts by ingesting the data into <a href=https://cloud.google.com/pubsub/docs/overview>Pub/Sub</a> so that we can read the tweets from Pub/Sub while clustering. Pub/Sub is a messaging service for exchanging event data among applications and services. Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data.</p><p>You can find the full example code for ingesting data into Pub/Sub in <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/online_clustering/write_data_to_pubsub_pipeline/>GitHub</a></p><p>The file structure for the ingestion pipeline is shown in the following diagram:</p><pre><code>write_data_to_pubsub_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
</code></pre><p><code>pipeline/utils.py</code> contains the code for loading the emotion dataset and two <code>beam.DoFn</code> that are used for data transformation.</p><p><code>pipeline/options.py</code> contains the pipeline options to configure the Dataflow pipeline.</p><p><code>config.py</code> defines some variables that are used multiple times, like GCP PROJECT_ID and NUM_WORKERS.</p><p><code>setup.py</code> defines the packages and requirements for the pipeline to run.</p><p><code>main.py</code> contains the pipeline code and some additional function used for running the pipeline.</p><h2 id=run-the-pipeline>Run the Pipeline</h2><p>First, install the required packages.</p><ol><li>Locally on your machine: <code>python main.py</code></li><li>On GCP for Dataflow: <code>python main.py --mode cloud</code></li></ol><p>The <code>write_data_to_pubsub_pipeline</code> contains four different transforms:</p><ol><li>Load the emotion dataset using Hugging Face datasets (for simplicity, we take samples from three classes instead of six).</li><li>Associate each piece of text with a unique identifier (UID).</li><li>Convert the text into the format that Pub/Sub expects.</li><li>Write the formatted message to Pub/Sub.</li></ol><h2 id=clustering-on-streaming-data>Clustering on Streaming Data</h2><p>After ingesting the data to Pub/Sub, examine the second pipeline, where we read the streaming message from Pub/Sub, convert the text to a embedding using a language model, and cluster the embedding using BIRCH.</p><p>You can find the full example code for all the steps mentioned previously in <a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/online_clustering/clustering_pipeline/>GitHub</a>.</p><p>The file structure for clustering_pipeline is:</p><pre><code>clustering_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
</code></pre><p><code>pipeline/transformations.py</code> contains the code for the different <code>beam.DoFn</code> that are used in the pipeline.</p><p><code>pipeline/options.py</code> contains the pipeline options to configure the Dataflow pipeline.</p><p><code>config.py</code> defines variables that are used multiple times, like Google Cloud PROJECT_ID and NUM_WORKERS.</p><p><code>setup.py</code> defines the packages and requirements for the pipeline to run.</p><p><code>main.py</code> contains the pipeline code and some additional functions used for running the pipeline.</p><h3 id=run-the-pipeline-1>Run the Pipeline</h3><p>Install the required packages and push data to Pub/Sub.</p><ol><li>Locally on your machine: <code>python main.py</code></li><li>On GCP for Dataflow: <code>python main.py --mode cloud</code></li></ol><p>The pipeline can be broken down into the following steps:</p><ol><li>Read the message from Pub/Sub.</li><li>Convert the Pub/Sub message into a <code>PCollection</code> of dictionaries where the key is the UID and the value is the Twitter text.</li><li>Encode the text into transformer-readable token ID integers using a tokenizer.</li><li>Use RunInference to get the vector embedding from a transformer-based language model.</li><li>Normalize the embedding for clustering.</li><li>Perform BIRCH clustering using stateful processing.</li><li>Print the texts assigned to clusters.</li></ol><p>The following code shows the first two steps of the pipeline, where a message from Pub/Sub is read and converted into a dictionary.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> docs = (
pipeline
| &#34;Read from PubSub&#34;
&gt;&gt; ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True)
| &#34;Decode PubSubMessage&#34; &gt;&gt; beam.ParDo(Decode())
)</code></pre></div></div><p>The next sections examine three important pipeline steps:</p><ol><li>Tokenize the text.</li><li>Feed the tokenized text to get embedding from a transformer-based language model.</li><li>Perform clustering using <a href=/blog/stateful-processing/>stateful processing</a>.</li></ol><h3 id=get-embedding-from-a-language-model>Get Embedding from a Language Model</h3><p>In order cluster text data, you need to map the text into vectors of numerical values suitable for statistical analysis. This example uses a transformer-based language model called <a href=https://huggingface.co/sentence-transformers/stsb-distilbert-base>sentence-transformers/stsb-distilbert-base/stsb-distilbert-base</a>. It maps sentences and paragraphs to a 768 dimensional dense vector space, and you can use it for tasks like clustering or semantic search.</p><p>Because the language model is expecting a tokenized input instead of raw text, start by tokenizing the text. Tokenization is a preprocessing task that transforms text so that it can be fed into the model for getting predictions.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> normalized_embedding = (
docs
| &#34;Tokenize Text&#34; &gt;&gt; beam.Map(tokenize_sentence)</code></pre></div></div><p>Here, <code>tokenize_sentence</code> is a function that takes a dictionary with a text and an ID, tokenizes the text, and returns a tuple (text, id) and the tokenized output.</p><p>Tokenized output is then passed to the language model for getting the embeddings. For getting embeddings from the language model, we use <code>RunInference()</code> from Apache Beam.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> | &#34;Get Embedding&#34; &gt;&gt; RunInference(KeyedModelHandler(model_handler))</code></pre></div></div><p>To make better clusters, after getting the embedding for each piece of Twitter text, the embeddings are normalized.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> | &#34;Normalize Embedding&#34; &gt;&gt; beam.ParDo(NormalizeEmbedding())</code></pre></div></div><h3 id=statefulonlineclustering>StatefulOnlineClustering</h3><p>Because the data is streaming, you need to use an iterative clustering algorithm, like BIRCH. And because the algorithm is iterative, you need a mechanism to store the previous state so that when Twitter text arrives, it can be updated. <strong>Stateful processing</strong> enables a <code>DoFn</code> to have persistent state, which can be read and written during the processing of each element. For more information about stateful processing, see <a href=/blog/stateful-processing/>Stateful processing with Apache Beam</a>.</p><p>In this example, every time a new message is read from Pub/Sub, you retrieve the existing state of the clustering model, update it, and write it back to the state.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code> clustering = (
normalized_embedding
| &#34;Map doc to key&#34; &gt;&gt; beam.Map(lambda x: (1, x))
| &#34;StatefulClustering using Birch&#34; &gt;&gt; beam.ParDo(StatefulOnlineClustering())
)</code></pre></div></div><p>Because BIRCH doesn&rsquo;t support parallelization, you need to make sure that only one worker is doing all of the stateful processing. To do that, use <code>Beam.Map</code> to associate each text to the same key <code>1</code>.</p><p><code>StatefulOnlineClustering</code> is a <code>DoFn</code> that takes an embedding of a text and updates the clustering model. To store the state, it uses the <code>ReadModifyWriteStateSpec</code> state object, which acts as a container for storage.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>class StatefulOnlineClustering(beam.DoFn):
BIRCH_MODEL_SPEC = ReadModifyWriteStateSpec(&#34;clustering_model&#34;, PickleCoder())
DATA_ITEMS_SPEC = ReadModifyWriteStateSpec(&#34;data_items&#34;, PickleCoder())
EMBEDDINGS_SPEC = ReadModifyWriteStateSpec(&#34;embeddings&#34;, PickleCoder())
UPDATE_COUNTER_SPEC = ReadModifyWriteStateSpec(&#34;update_counter&#34;, PickleCoder())</code></pre></div></div><p>This example declares four different <code>ReadModifyWriteStateSpec objects</code>:</p><ul><li><code>BIRCH_MODEL_SPEC</code> holds the state of clustering model.</li><li><code>DATA_ITEMS_SPEC</code> holds the Twitter texts seen so far.</li><li><code>EMBEDDINGS_SPEC</code> holds the normalized embeddings.</li><li><code>UPDATE_COUNTER_SPEC</code> holds the number of texts processed.</li></ul><p>These <code>ReadModifyWriteStateSpec</code> objects are passed as an additional argument to the <code>process</code> function. When a news item comes in, we retrieve the existing state of the different objects, update them, and then write them back as persistent shared state.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>def process(
self,
element,
model_state=beam.DoFn.StateParam(BIRCH_MODEL_SPEC),
collected_docs_state=beam.DoFn.StateParam(DATA_ITEMS_SPEC),
collected_embeddings_state=beam.DoFn.StateParam(EMBEDDINGS_SPEC),
update_counter_state=beam.DoFn.StateParam(UPDATE_COUNTER_SPEC),
*args,
**kwargs,
):
&#34;&#34;&#34;
Takes the embedding of a document and updates the clustering model
Args:
element: The input element to be processed.
model_state: This is the state of the clustering model. It is a stateful parameter,
which means that it will be updated after each call to the process function.
collected_docs_state: This is a stateful dictionary that stores the documents that
have been processed so far.
collected_embeddings_state: This is a dictionary of document IDs and their embeddings.
update_counter_state: This is a counter that keeps track of how many documents have been
processed.
&#34;&#34;&#34;
# 1. Initialise or load states
clustering = model_state.read() or Birch(n_clusters=None, threshold=0.7)
collected_documents = collected_docs_state.read() or {}
collected_embeddings = collected_embeddings_state.read() or {}
update_counter = update_counter_state.read() or Counter()
# 2. Extract document, add to state, and add to clustering model
_, doc = element
doc_id = doc[&#34;id&#34;]
embedding_vector = doc[&#34;embedding&#34;]
collected_embeddings[doc_id] = embedding_vector
collected_documents[doc_id] = {&#34;id&#34;: doc_id, &#34;text&#34;: doc[&#34;text&#34;]}
update_counter = len(collected_documents)
clustering.partial_fit(np.atleast_2d(embedding_vector))
# 3. Predict cluster labels of collected documents
cluster_labels = clustering.predict(
np.array(list(collected_embeddings.values())))
# 4. Write states
model_state.write(clustering)
collected_docs_state.write(collected_documents)
collected_embeddings_state.write(collected_embeddings)
update_counter_state.write(update_counter)
yield {
&#34;labels&#34;: cluster_labels,
&#34;docs&#34;: collected_documents,
&#34;id&#34;: list(collected_embeddings.keys()),
&#34;counter&#34;: update_counter,
}</code></pre></div></div><p><code>GetUpdates</code> is a <code>DoFn</code> that prints the cluster assigned to each Twitter message every time a new message arrives.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>updated_clusters = clustering | &#34;Format Update&#34; &gt;&gt; beam.ParDo(GetUpdates())</code></pre></div></div><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>