blob: fcead75a1cd9432ceaf9706bf54b54dd98aac9dd [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>About Beam ML</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/ml/about-ml/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/ml/about-ml.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/ml/about-ml.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#aiml-workloads>AI/ML workloads</a></li><li><a href=#use-runinference>Use RunInference</a><ul><li><a href=#support-and-limitations>Support and limitations</a></li><li><a href=#batchelements-ptransform>BatchElements PTransform</a></li><li><a href=#shared-helper-class>Shared helper class</a></li><li><a href=#modify-a-python-pipeline-to-use-an-ml-model>Modify a Python pipeline to use an ML model</a></li><li><a href=#use-pre-trained-models>Use pre-trained models</a><ul><li><a href=#pytorch>PyTorch</a></li><li><a href=#scikit-learn>Scikit-learn</a></li><li><a href=#tensorflow>TensorFlow</a></li></ul></li><li><a href=#use-custom-models>Use custom models</a></li><li><a href=#runinference-patterns>RunInference Patterns</a><ul><li><a href=#use-a-keyed-modelhandler-object>Use a keyed ModelHandler object</a></li><li><a href=#use-the-predictionresult-object>Use the PredictionResult object</a></li><li><a href=#automatic-model-refresh>Automatic model refresh</a></li><li><a href=#preprocess-and-postprocess-your-records>Preprocess and postprocess your records</a></li><li><a href=#handle-errors>Handle errors</a></li><li><a href=#run-inference-from-a-java-pipeline>Run inference from a Java pipeline</a></li></ul></li></ul></li><li><a href=#custom-inference>Custom Inference</a></li><li><a href=#multi-model-pipelines>Multi-model pipelines</a><ul><li><a href=#ab-pattern>A/B Pattern</a></li><li><a href=#cascade-pattern>Cascade Pattern</a></li><li><a href=#use-resource-hints-for-different-model-requirements>Use Resource Hints for Different Model Requirements</a></li></ul></li><li><a href=#model-validation>Model validation</a></li><li><a href=#troubleshooting>Troubleshooting</a><ul><li><a href=#unable-to-batch-tensor-elements>Unable to batch tensor elements</a></li></ul></li><li><a href=#related-links>Related links</a></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=about-beam-ml>About Beam ML</h1><table><tr><td><a><table align=left style=margin-right:1em><td><a class=button target=_blank href=https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.html#apache_beam.ml.inference.RunInference><img src=https://beam.apache.org/images/logos/sdks/python.png width=32px height=32px alt=Pydoc>
Pydoc</a></td></table><p><br><br><br></p></a></td><td><a target=_blank class=button href=https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/extensions/python/transforms/RunInference.html><img src=https://beam.apache.org/images/logos/sdks/java.png width=20px height=30px alt=Javadoc>
Javadoc</a></td></tr></table><p>You can use Apache Beam to:</p><ul><li>Process large volumes of data, both for preprocessing and for inference.</li><li>Experiment with your data during the exploration phase of your project.</li><li>Upscale your data pipelines as part of your ML ops ecosystem in a production environment.</li><li>Run your model in production on a varying data load, both in batch and streaming.</li></ul><h2 id=aiml-workloads>AI/ML workloads</h2><p>You can use Apache Beam for data validation, data preprocessing, model validation, and model deployment and inference.</p><p><img src=/images/ml-workflows.svg alt="Overview of AI/ML building blocks and where Apache Beam can be used"></p><ol><li>Data ingestion: Incoming new data is either stored in your file system or database, or published to a messaging queue.</li><li><strong>Data validation</strong>: After you receieve your data, check the quality of the data. For example, you might want to detect outliers and calculate standard deviations and class distributions.</li><li><strong>Data preprocessing</strong>: After you validate your data, transform the data so that it&rsquo;s ready to use to train your model.</li><li>Model training: When your data is ready, train your AI/ML model. This step is typically repeated multiple times, depending on the quality of your trained model.</li><li>Model validation: Before you deploy your model, validate its performance and accuracy.</li><li><strong>Model deployment</strong>: Deploy your model, using it to run inference on new or existing data.</li></ol><p>To keep your model up to date and performing well as your data grows and evolves, run these steps multiple times. In addition, you can apply ML ops to your project to automate the AI/ML workflows throughout the model and data lifecycle. Use orchestrators to automate this flow and to handle the transition between the different building blocks in your project.</p><h2 id=use-runinference>Use RunInference</h2><p>The RunInference API is a <code>PTransform</code> optimized for machine learning inferences that lets you efficiently use ML models in your pipelines. The API includes the following features:</p><ul><li>To efficiently feed your model, dynamically batches inputs based on pipeline throughput using Apache Beam&rsquo;s <code>BatchElements</code> transform.</li><li>To balance memory and throughput usage, determines the optimal number of models to load using a central model manager. Shares these models across threads and processes as needed to maximize throughput.</li><li>Ensures that your pipeline uses the most recently deployed version of your model with the <a href=#automatic-model-refresh>Automatic model refresh</a> feature.</li><li>Supports <a href=#use-pre-trained-models>multiple frameworks and model hubs</a>, including Tensorflow, Pytorch, Sklearn, XGBoost, Hugging Face, TensorFlow Hub, Vertex AI, TensorRT, and ONNX.</li><li>Supports arbitrary frameworks using a <a href=#use-custom-models>custom model handler</a>.</li><li>Supports <a href=#multi-model-pipelines>multi-model pipelines</a>.</li><li>Lets you use GPUs on supported runners to increase inference speed. For more information, see <a href=https://cloud.google.com/dataflow/docs/gpu>GPUs with Dataflow</a> in the Dataflow documentation.</li></ul><h3 id=support-and-limitations>Support and limitations</h3><ul><li>The RunInference API is supported in Apache Beam 2.40.0 and later versions.</li><li>Model handlers are available for PyTorch, scikit-learn, TensorFlow, Hugging Face, Vertex AI, ONNX, TensorRT, and XGBoost. You can also use a custom model handler.</li><li>The RunInference API supports batch and streaming pipelines.</li><li>The RunInference API supports both remote inference and inteference local to the runner worker.</li></ul><h3 id=batchelements-ptransform>BatchElements PTransform</h3><p>To take advantage of the optimizations of vectorized inference that many models implement, the <code>BatchElements</code> transform is used as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy <code>ndarrays</code>, we call <code>numpy.stack()</code>, and for torch <code>Tensor</code> elements, we call <code>torch.stack()</code>.</p><p>To customize the settings for <code>beam.BatchElements</code>, in <code>ModelHandler</code>, override the <code>batch_elements_kwargs</code> function. For example, use <code>min_batch_size</code> to set the lowest number of elements per batch or <code>max_batch_size</code> to set the highest number of elements per batch.</p><p>For more information, see the <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements><code>BatchElements</code> transform documentation</a>.</p><h3 id=shared-helper-class>Shared helper class</h3><p>Using the <code>Shared</code> class within the RunInference implementation makes it possible to load the model only once per process and share it with all DoFn instances created in that process. This feature reduces memory consumption and model loading time. For more information, see the
<a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20><code>Shared</code> class documentation</a>.</p><h3 id=modify-a-python-pipeline-to-use-an-ml-model>Modify a Python pipeline to use an ML model</h3><p>To use the RunInference transform, add the following code to your pipeline:</p><pre tabindex=0><code>from apache_beam.ml.inference.base import RunInference
with pipeline as p:
predictions = ( p | &#39;Read&#39; &gt;&gt; beam.ReadFromSource(&#39;a_source&#39;)
| &#39;RunInference&#39; &gt;&gt; RunInference(&lt;model_handler&gt;)
</code></pre><p>Replace <code>model_handler</code> with the model handler setup code.</p><p>To import models, you need to configure a <code>ModelHandler</code> object that wraps the underlying model. Which model handler you import depends on the framework and type of data structure that contains the inputs. The <code>ModelHandler</code> object also allows you to set environment variables needed for inference using the <code>env_vars</code> keyword argument. The following examples show some model handlers that you might want to import.</p><pre tabindex=0><code>from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from tfx_bsl.public.beam.run_inference import CreateModelHandler
</code></pre><h3 id=use-pre-trained-models>Use pre-trained models</h3><p>The section provides requirements for using pre-trained models with PyTorch, Scikit-learn, and Tensorflow.</p><h4 id=pytorch>PyTorch</h4><p>You need to provide a path to a file that contains the model&rsquo;s saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps:</p><ol><li>Download the pre-trained weights and host them in a location that the pipeline can access.</li><li>Pass the path of the model weights to the PyTorch <code>ModelHandler</code> by using the following code: <code>state_dict_path=&lt;path_to_weights></code>.</li></ol><p>See <a href=https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_pytorch.ipynb>this notebook</a>
that illustrates running PyTorch models with Apache Beam.</p><h4 id=scikit-learn>Scikit-learn</h4><p>You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps:</p><ol><li>Download the pickled model class and host it in a location that the pipeline can access.</li><li>Pass the path of the model to the Sklearn <code>ModelHandler</code> by using the following code:
<code>model_uri=&lt;path_to_pickled_file></code> and <code>model_file_type: &lt;ModelFileType></code>, where you can specify
<code>ModelFileType.PICKLE</code> or <code>ModelFileType.JOBLIB</code>, depending on how the model was serialized.</li></ol><p>See <a href=https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_sklearn.ipynb>this notebook</a>
that illustrates running Scikit-learn models with Apache Beam.</p><h4 id=tensorflow>TensorFlow</h4><p>To use TensorFlow with the RunInference API, you have two options:</p><ol><li>Use the built-in TensorFlow Model Handlers in Apache Beam SDK - <code>TFModelHandlerNumpy</code> and <code>TFModelHandlerTensor</code>.<ul><li>Depending on the type of input for your model, use <code>TFModelHandlerNumpy</code> for <code>numpy</code> input and <code>TFModelHandlerTensor</code> for <code>tf.Tensor</code> input respectively.</li><li>Use tensorflow 2.7 or later.</li><li>Pass the path of the model to the TensorFlow <code>ModelHandler</code> by using <code>model_uri=&lt;path_to_trained_model></code>.</li><li>Alternatively, you can pass the path to saved weights of the trained model, a function to build the model using <code>create_model_fn=&lt;function></code>, and set the <code>model_type=ModelType.SAVED_WEIGHTS</code>.
See <a href=https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb>this notebook</a> that illustrates running Tensorflow models with Built-in model handlers.</li></ul></li><li>Using <code>tfx_bsl</code>.<ul><li>Use this approach if your model input is of type <code>tf.Example</code>.</li><li>Use <code>tfx_bsl</code> version 1.10.0 or later.</li><li>Create a model handler using <code>tfx_bsl.public.beam.run_inference.CreateModelHandler()</code>.</li><li>Use the model handler with the <a href=/releases/pydoc/current/apache_beam.ml.inference.base.html><code>apache_beam.ml.inference.base.RunInference</code></a> transform.
See <a href=https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_tensorflow.ipynb>this notebook</a>
that illustrates running TensorFlow models with Apache Beam and tfx-bsl.</li></ul></li></ol><h3 id=use-custom-models>Use custom models</h3><p>If you would like to use a model that isn&rsquo;t specified by one of the supported frameworks, the RunInference API is designed flexibly to allow you to use any custom machine learning models.
You only need to create your own <code>ModelHandler</code> or <code>KeyedModelHandler</code> with logic to load your model and use it to run the inference.</p><p>A simple example can be found in <a href=https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_custom_inference.ipynb>this notebook</a>.
The <code>load_model</code> method shows how to load the model using a popular <code>spaCy</code> package while <code>run_inference</code> shows how to run the inference on a batch of examples.</p><h3 id=runinference-patterns>RunInference Patterns</h3><p>This section suggests patterns and best practices that you can use to make your inference pipelines simpler,
more robust, and more efficient.</p><h4 id=use-a-keyed-modelhandler-object>Use a keyed ModelHandler object</h4><p>If a key is attached to the examples, wrap <code>KeyedModelHandler</code> around the <code>ModelHandler</code> object:</p><pre tabindex=0><code>from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
with pipeline as p:
data = p | beam.Create([
(&#39;img1&#39;, torch.tensor([[1,2,3],[4,5,6],...])),
(&#39;img2&#39;, torch.tensor([[1,2,3],[4,5,6],...])),
(&#39;img3&#39;, torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
</code></pre><p>If you are unsure if your data is keyed, you can use <code>MaybeKeyedModelHandler</code>.</p><p>You can also use a <code>KeyedModelHandler</code> to load several different models based on their associated key.
The following example loads a model by using <code>config1</code>. That model is used for inference for all examples associated
with <code>key1</code>. It loads a second model by using <code>config2</code>. That model is used for all examples associated with <code>key2</code> and <code>key3</code>.</p><pre tabindex=0><code>from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
KeyModelMapping([&#39;key1&#39;], PytorchModelHandlerTensor(&lt;config1&gt;)),
KeyModelMapping([&#39;key2&#39;, &#39;key3&#39;], PytorchModelHandlerTensor(&lt;config2&gt;))
])
with pipeline as p:
data = p | beam.Create([
(&#39;key1&#39;, torch.tensor([[1,2,3],[4,5,6],...])),
(&#39;key2&#39;, torch.tensor([[1,2,3],[4,5,6],...])),
(&#39;key3&#39;, torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
</code></pre><p>For a more detailed example, see the notebook
<a href=https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/per_key_models.ipynb>Run ML inference with multiple differently-trained models</a>.</p><p>Loading multiple models at the same times increases the risk of out of memory errors (OOMs). By default, <code>KeyedModelHandler</code> doesn&rsquo;t
limit the number of models loaded into memory at the same time. If the models don&rsquo;t all fit into memory,
your pipeline might fail with an out of memory error. To avoid this issue, use the <code>max_models_per_worker_hint</code> parameter
to set the maximum number of models that can be loaded into memory at the same time.</p><p>The following example loads at most two models per SDK worker process at a time. It unloads models that aren&rsquo;t
currently in use.</p><pre tabindex=0><code>mhs = [
KeyModelMapping([&#39;key1&#39;], PytorchModelHandlerTensor(&lt;config1&gt;)),
KeyModelMapping([&#39;key2&#39;, &#39;key3&#39;], PytorchModelHandlerTensor(&lt;config2&gt;)),
KeyModelMapping([&#39;key4&#39;], PytorchModelHandlerTensor(&lt;config3&gt;)),
KeyModelMapping([&#39;key5&#39;, &#39;key6&#39;, &#39;key7&#39;], PytorchModelHandlerTensor(&lt;config4&gt;)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
</code></pre><p>Runners that have multiple SDK worker processes on a given machine load at most
<code>max_models_per_worker_hint*&lt;num worker processes></code> models onto the machine.</p><p>Leave enough space for the models and any additional memory needs from other transforms.
Because the memory might not be released immediately after a model is offloaded,
leaving an additional buffer is recommended.</p><p><strong>Note</strong>: Having many models but a small <code>max_models_per_worker_hint</code> can cause <em>memory thrashing</em>, where
a large amount of execution time is used to swap models in and out of memory. To reduce the likelihood and impact
of memory thrashing, if you&rsquo;re using a distributed runner, insert a
<a href=https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/><code>GroupByKey</code></a> transform before your
inference step. The <code>GroupByKey</code> transform reduces thrashing by ensuring that elements with the same key and model are
collocated on the same worker.</p><p>For more information, see <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.KeyedModelHandler><code>KeyedModelHander</code></a>.</p><h4 id=use-the-predictionresult-object>Use the PredictionResult object</h4><p>When doing a prediction in Apache Beam, the output <code>PCollection</code> includes both the keys of the input examples and the inferences. Including both these items in the output allows you to find the input that determined the predictions.</p><p>The <code>PredictionResult</code> object is a <code>NamedTuple</code> that contains both the input and the inferences, named <code>example</code> and <code>inference</code>, respectively. When keys are passed with the input data to the RunInference transform, the output <code>PCollection</code> returns a <code>Tuple[str, PredictionResult]</code>, which is the key and the <code>PredictionResult</code> object. Your pipeline interacts with a <code>PredictionResult</code> object in steps after the RunInference transform.</p><pre tabindex=0><code>class PostProcessor(beam.DoFn):
def process(self, element: Tuple[str, PredictionResult]):
key, prediction_result = element
inputs = prediction_result.example
predictions = prediction_result.inference
# Post-processing logic
result = ...
yield (key, result)
with pipeline as p:
output = (
p | &#39;Read&#39; &gt;&gt; beam.ReadFromSource(&#39;a_source&#39;)
| &#39;PyTorchRunInference&#39; &gt;&gt; RunInference(&lt;keyed_model_handler&gt;)
| &#39;ProcessOutput&#39; &gt;&gt; beam.ParDo(PostProcessor()))
</code></pre><p>If you need to use this object explicitly, include the following line in your pipeline to import the object:</p><pre tabindex=0><code>from apache_beam.ml.inference.base import PredictionResult
</code></pre><p>For more information, see the <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/base.py#L65><code>PredictionResult</code> documentation</a>.</p><h4 id=automatic-model-refresh>Automatic model refresh</h4><p>To automatically update the model being used with the RunInference <code>PTransform</code> without stopping the pipeline, pass a <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata><code>ModelMetadata</code></a> side input <code>PCollection</code> to the RunInference input parameter <code>model_metadata_pcoll</code>.</p><p><code>ModelMetdata</code> is a <code>NamedTuple</code> containing:</p><ul><li><p><code>model_id</code>: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective <code>ModelHandlers</code> can load the models without errors.</p><p>For example, <code>PyTorchModelHandler</code> initially loads a model using weights and a model class. If you pass in weights from a different model class when you update the model using side inputs, the model doesn&rsquo;t load properly, because it expects the weights from the original model class.</p></li><li><p><code>model_name</code>: Human-readable name for the model. You can use this name to identify the model in the metrics generated by the RunInference transform.</p></li></ul><p>Use cases:</p><ul><li>Use <code>WatchFilePattern</code> as side input to the RunInference <code>PTransform</code> to automatically update the ML model. For more information, see <a href=https://beam.apache.org/documentation/ml/side-input-updates>Use <code>WatchFilePattern</code> as side input to auto-update ML models in RunInference</a>.</li></ul><p>The side input <code>PCollection</code> must follow the <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html?highlight=assingleton#apache_beam.pvalue.AsSingleton"><code>AsSingleton</code></a> view to avoid errors.</p><p><strong>Note</strong>: If the main <code>PCollection</code> emits inputs and a side input has yet to receive inputs, the main <code>PCollection</code> is buffered until there is
an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as <code>AfterCount</code> and <code>AfterProcessingTime</code>. Until the side input is updated, emit the default or initial model ID that is used to pass the respective <code>ModelHandler</code> as a side input.</p><h4 id=preprocess-and-postprocess-your-records>Preprocess and postprocess your records</h4><p>With RunInference, you can add preprocessing and postprocessing operations to your transform.
To apply preprocessing operations, use <code>with_preprocess_fn</code> on your model handler:</p><pre tabindex=0><code>inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
</code></pre><p>To apply postprocessing operations, use <code>with_postprocess_fn</code> on your model handler:</p><pre tabindex=0><code>inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
</code></pre><p>You can also chain multiple pre- and postprocessing operations:</p><pre tabindex=0><code>inference = pcoll | RunInference(
model_handler.with_preprocess_fn(
lambda x : do_something(x)
).with_preprocess_fn(
lambda x : do_something_else(x)
).with_postprocess_fn(
lambda x : do_something_after_inference(x)
).with_postprocess_fn(
lambda x : do_something_else_after_inference(x)
))
</code></pre><p>The preprocessing function is run before batching and inference. This function maps your input <code>PCollection</code>
to the base input type of the model handler. If you apply multiple preprocessing functions, they run on your original
<code>PCollection</code> in the order of last applied to first applied.</p><p>The postprocessing function runs after inference. This function maps the output type of the base model handler
to your desired output type. If you apply multiple postprocessing functions, they run on your original
inference result in the order of first applied to last applied.</p><h4 id=handle-errors>Handle errors</h4><p>To handle errors robustly while using RunInference, you can use a <em>dead-letter queue</em>. The dead-letter queue outputs failed records into a separate <code>PCollection</code> for further processing.
This <code>PCollection</code> can then be analyzed and sent to a storage system, where it can be reviewed and resubmitted to the pipeline, or discarded.
RunInference has built-in support for dead-letter queues. You can use a dead-letter queue by applying <code>with_exception_handling</code> to your RunInference transform:</p><pre tabindex=0><code>main, other = pcoll | RunInference(model_handler).with_exception_handling()
other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
</code></pre><p>You can also apply this pattern to RunInference transforms with associated pre- and postprocessing operations:</p><pre tabindex=0><code>main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
other.failed_inferences | beam.Map(print) # handles failed inferences
other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied
</code></pre><h4 id=run-inference-from-a-java-pipeline>Run inference from a Java pipeline</h4><p>The RunInference API is available with the Beam Java SDK versions 2.41.0 and later through Apache Beam&rsquo;s <a href=/documentation/programming-guide/#multi-language-pipelines>Multi-language Pipelines framework</a>. For information about the Java wrapper transform, see <a href=https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/RunInference.java>RunInference.java</a>. To try it out, see the <a href=https://github.com/apache/beam/tree/master/examples/multi-language>Java Sklearn Mnist Classification example</a>. Additionally, see <a href=https://beam.apache.org/documentation/ml/multi-language-inference/>Using RunInference from Java SDK</a> for an example of a composite Python transform that uses the RunInference API along with preprocessing and postprocessing from a Beam Java SDK pipeline.</p><h2 id=custom-inference>Custom Inference</h2><p>The RunInference API doesn&rsquo;t currently support making remote inference calls using, for example, the Natural Language API or the Cloud Vision API. Therefore, in order to use these remote APIs with Apache Beam, you need to write custom inference calls. The <a href=https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/custom_remote_inference.ipynb>Remote inference in Apache Beam notebook</a> shows how to implement a custom remote inference call using <code>beam.DoFn</code>. When you implement a remote inference for real life projects, consider the following factors:</p><ul><li><p>API quotas and the heavy load you might incur on your external API. To optimize the calls to an external API, you can confgure <code>PipelineOptions</code> to limit the parallel calls to the external remote API.</p></li><li><p>Be prepared to encounter, identify, and handle failure as gracefully as possible. Use techniques like exponential backoff and dead-letter queues (unprocessed messages queues).</p></li><li><p>When running inference with an external API, batch your input together to allow for more efficient execution.</p></li><li><p>Consider monitoring and measuring the performance of a pipeline when deploying, because monitoring can provide insight into the status and health of the application.</p></li></ul><h2 id=multi-model-pipelines>Multi-model pipelines</h2><p>Use the RunInference transform to add multiple inference models to your pipeline. Multi-model pipelines can be useful for A/B testing or for building out cascade models made up of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. For more information, see <a href=https://beam.apache.org/documentation/ml/multi-model-pipelines/>Multi-model pipelines</a>.</p><h3 id=ab-pattern>A/B Pattern</h3><pre tabindex=0><code>with pipeline as p:
data = p | &#39;Read&#39; &gt;&gt; beam.ReadFromSource(&#39;a_source&#39;)
model_a_predictions = data | RunInference(&lt;model_handler_A&gt;)
model_b_predictions = data | RunInference(&lt;model_handler_B&gt;)
</code></pre><p>Where <code>model_handler_A</code> and <code>model_handler_B</code> are the model handler setup code.</p><h3 id=cascade-pattern>Cascade Pattern</h3><pre tabindex=0><code>with pipeline as p:
data = p | &#39;Read&#39; &gt;&gt; beam.ReadFromSource(&#39;a_source&#39;)
model_a_predictions = data | RunInference(&lt;model_handler_A&gt;)
model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(&lt;model_handler_B&gt;)
</code></pre><p>Where <code>model_handler_A</code> and <code>model_handler_B</code> are the model handler setup code.</p><h3 id=use-resource-hints-for-different-model-requirements>Use Resource Hints for Different Model Requirements</h3><p>When using multiple models in a single pipeline, different models may have different memory or worker SKU requirements.
Resource hints allow you to provide information to a runner about the compute resource requirements for each step in your
pipeline.</p><p>For example, the following snippet extends the previous cascade pattern with hints for each RunInference call
to specify RAM and hardware accelerator requirements:</p><pre tabindex=0><code>with pipeline as p:
data = p | &#39;Read&#39; &gt;&gt; beam.ReadFromSource(&#39;a_source&#39;)
model_a_predictions = data | RunInference(&lt;model_handler_A&gt;).with_resource_hints(min_ram=&#34;20GB&#34;)
model_b_predictions = model_a_predictions
| beam.Map(some_post_processing)
| RunInference(&lt;model_handler_B&gt;).with_resource_hints(
min_ram=&#34;4GB&#34;,
accelerator=&#34;type:nvidia-tesla-k80;count:1;install-nvidia-driver&#34;)
</code></pre><p>For more information on resource hints, see <a href=/documentation/runtime/resource-hints/>Resource hints</a>.</p><h2 id=model-validation>Model validation</h2><p>Model validation allows you to benchmark your model’s performance against a previously unseen dataset. You can extract chosen metrics, create visualizations, log metadata, and compare the performance of different models with the end goal of validating whether your model is ready to deploy. Beam provides support for running model evaluation on a TensorFlow model directly inside your pipeline.</p><p>The <a href=/documentation/ml/model-evaluation>ML model evaluation</a> page shows how to integrate model evaluation as part of your pipeline by using <a href=https://www.tensorflow.org/tfx/guide/tfma>TensorFlow Model Analysis (TFMA)</a>.</p><h2 id=troubleshooting>Troubleshooting</h2><p>If you run into problems with your pipeline or job, this section lists issues that you might encounter and provides suggestions for how to fix them.</p><h3 id=unable-to-batch-tensor-elements>Unable to batch tensor elements</h3><p>RunInference uses dynamic batching. However, the RunInference API cannot batch tensor elements of different sizes, so samples passed to the <code>RunInference</code> transform must be the same dimension or length. If you provide images of different sizes or word embeddings of different lengths, the following error might occur:</p><p><code>File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']</code></p><p>To avoid this issue, either use elements of the same size, or disable batching.</p><p><strong>Option 1: Use elements of the same size</strong></p><p>Use elements of the same size or resize the inputs. For computer vision applications, resize image inputs so that they have the same dimensions. For natural language processing (NLP) applications that have text of varying length, resize the text or word embeddings to make them the same length. When working with texts of varying length, resizing might not be possible. In this scenario, you could disable batching (see option 2).</p><p><strong>Option 2: Disable batching</strong></p><p>Disable batching by overriding the <code>batch_elements_kwargs</code> function in your ModelHandler and setting the maximum batch size (<code>max_batch_size</code>) to one: <code>max_batch_size=1</code>. For more information, see
<a href=/documentation/ml/about-ml/#batchelements-ptransform>BatchElements PTransforms</a>. For an example, see our <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py>language modeling example</a>.</p><h2 id=related-links>Related links</h2><ul><li><a href=/documentation/transforms/python/elementwise/runinference>RunInference transforms</a></li><li><a href=https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference>RunInference API pipeline examples</a></li><li><a href=https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/run_inference_basic.ipynb>RunInference public codelab</a></li><li><a href=https://github.com/apache/beam/tree/master/examples/notebooks/beam-ml>RunInference notebooks</a></li><li><a href="http://s.apache.org/beam-community-metrics/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1">RunInference benchmarks</a></li></ul><table><tr><td><a><table align=left style=margin-right:1em><td><a class=button target=_blank href=https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.html#apache_beam.ml.inference.RunInference><img src=https://beam.apache.org/images/logos/sdks/python.png width=32px height=32px alt=Pydoc>
Pydoc</a></td></table><p><br><br><br></p></a></td><td><a target=_blank class=button href=https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/extensions/python/transforms/RunInference.html><img src=https://beam.apache.org/images/logos/sdks/java.png width=20px height=30px alt=Javadoc>
Javadoc</a></td></tr></table><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>