blob: aabce9ec633137c89663ad26a790fafc15a35c21 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Testing I/O Transforms</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/io/testing/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/testing.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/testing.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#testing-io-transforms-in-apache-beam>Testing I/O Transforms in Apache Beam</a></li><li><a href=#introduction>Introduction</a></li><li><a href=#a-note-on-performance-benchmarking>A note on performance benchmarking</a></li><li><a href=#test-balance-unit-vs-integration>Test Balance - Unit vs Integration</a></li><li><a href=#examples>Examples</a></li><li><a href=#unit-tests>Unit Tests</a><ul><li><a href=#goals>Goals</a></li><li><a href=#non-goals>Non-goals</a></li><li><a href=#implementing-unit-tests>Implementing unit tests</a></li><li><a href=#use-fakes>Use fakes</a></li><li><a href=#network-failure>Network failure</a></li></ul></li><li><a href=#batching>Batching</a></li><li><a href=#i-o-transform-integration-tests>I/O Transform Integration Tests</a><ul><li><a href=#it-goals>Goals</a></li><li><a href=#integration-tests-data-stores-and-kubernetes>Integration tests, data stores, and Kubernetes</a></li><li><a href=#running-integration-tests-on-your-machine>Running integration tests on your machine</a><ul><li><a href=#datastore-setup-cleanup>Data store setup/cleanup</a></li><li><a href=#running-a-test>Running a particular test</a></li></ul></li><li><a href=#running-integration-tests-on-pull-requests>Running Integration Tests on Pull Requests</a></li><li><a href=#performance-testing-dashboard>Performance testing dashboard</a></li><li><a href=#implementing-integration-tests>Implementing Integration Tests</a><ul><li><a href=#test-code>Test Code</a></li><li><a href=#kubernetes-scripts>Kubernetes scripts</a></li><li><a href=#jenkins-jobs>Jenkins jobs</a></li></ul></li><li><a href=#small-scale-and-large-scale-integration-tests>Small Scale and Large Scale Integration Tests</a></li></ul></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h2 id=testing-io-transforms-in-apache-beam>Testing I/O Transforms in Apache Beam</h2><p><em>Examples and design patterns for testing Apache Beam I/O transforms</em></p><nav class=language-switcher><strong>Adapt for:</strong><ul><li data-value=java class=active>Java SDK</li><li data-value=py>Python SDK</li></ul></nav><blockquote><p>Note: This guide is still in progress. There is an open issue to finish the guide: <a href=https://issues.apache.org/jira/browse/BEAM-1025>BEAM-1025</a>.</p></blockquote><h2 id=introduction>Introduction</h2><p>This document explains the set of tests that the Beam community recommends based on our past experience writing I/O transforms. If you wish to contribute your I/O transform to the Beam community, we&rsquo;ll ask you to implement these tests.</p><p>While it is standard to write unit tests and integration tests, there are many possible definitions. Our definitions are:</p><ul><li><strong>Unit Tests:</strong><ul><li>Goal: verifying correctness of the transform only - core behavior, corner cases, etc.</li><li>Data store used: an in-memory version of the data store (if available), otherwise you&rsquo;ll need to write a <a href=#use-fakes>fake</a></li><li>Data set size: tiny (10s to 100s of rows)</li></ul></li><li><strong>Integration Tests:</strong><ul><li>Goal: catch problems that occur when interacting with real versions of the runners/data store</li><li>Data store used: an actual instance, pre-configured before the test</li><li>Data set size: small to medium (1000 rows to 10s of GBs)</li></ul></li></ul><h2 id=a-note-on-performance-benchmarking>A note on performance benchmarking</h2><p>We do not advocate writing a separate test specifically for performance benchmarking. Instead, we recommend setting up integration tests that can accept the necessary parameters to cover many different testing scenarios.</p><p>For example, if integration tests are written according to the guidelines below, the integration tests can be run on different runners (either local or in a cluster configuration) and against a data store that is a small instance with a small data set, or a large production-ready cluster with larger data set. This can provide coverage for a variety of scenarios - one of them is performance benchmarking.</p><h2 id=test-balance-unit-vs-integration>Test Balance - Unit vs Integration</h2><p>It&rsquo;s easy to cover a large amount of code with an integration test, but it is then hard to find a cause for test failures and the test is flakier.</p><p>However, there is a valuable set of bugs found by tests that exercise multiple workers reading/writing to data store instances that have multiple nodes (eg, read replicas, etc.). Those scenarios are hard to find with unit tests and we find they commonly cause bugs in I/O transforms.</p><p>Our test strategy is a balance of those 2 contradictory needs. We recommend doing as much testing as possible in unit tests, and writing a single, small integration test that can be run in various configurations.</p><h2 id=examples>Examples</h2><p>Java:</p><ul><li><a href=https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java>BigtableIO</a>&rsquo;s testing implementation is considered the best example of current best practices for unit testing <code>Source</code>s</li><li><a href=https://github.com/apache/beam/blob/master/sdks/java/io/jdbc>JdbcIO</a> has the current best practice examples for writing integration tests.</li><li><a href=https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch>ElasticsearchIO</a> demonstrates testing for bounded read/write</li><li><a href=https://github.com/apache/beam/tree/master/sdks/java/io/mqtt>MqttIO</a> and <a href=https://github.com/apache/beam/tree/master/sdks/java/io/amqp>AmpqpIO</a> demonstrate unbounded read/write</li></ul><p>Python:</p><ul><li><a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/avroio_test.py>avroio_test</a> for examples of testing liquid sharding, <code>source_test_utils</code>, <code>assert_that</code> and <code>equal_to</code></li></ul><h2 id=unit-tests>Unit Tests</h2><h3 id=goals>Goals</h3><ul><li>Validate the correctness of the code in your I/O transform.</li><li>Validate that the I/O transform works correctly when used in concert with reference implementations of the data store it connects with (where &ldquo;reference implementation&rdquo; means a fake or in-memory version).</li><li>Be able to run quickly and need only one machine, with a reasonably small memory/disk footprint and no non-local network access (preferably none at all). Aim for tests than run within several seconds - anything above 20 seconds should be discussed with the beam dev mailing list.</li><li>Validate that the I/O transform can handle network failures.</li></ul><h3 id=non-goals>Non-goals</h3><ul><li>Test problems in the external data store - this can lead to extremely complicated tests.</li></ul><h3 id=implementing-unit-tests>Implementing unit tests</h3><p>A general guide to writing Unit Tests for all transforms can be found in the <a href=/contribute/ptransform-style-guide/#testing>PTransform Style Guide</a>. We have expanded on a few important points below.</p><p>If you are using the <code>Source</code> API, make sure to exhaustively unit-test your code. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard for your users to detect. Also look into using <span class=language-java><code>SourceTestUtils</code></span><span class=language-py><code>source_test_utils</code></span> - it is a key piece of testing <code>Source</code> implementations.</p><p>If you are not using the <code>Source</code> API, you can use <code>TestPipeline</code> with <span class=language-java><code>PAssert</code></span><span class=language-py><code>assert_that</code></span> to help with your testing.</p><p>If you are implementing write, you can use <code>TestPipeline</code> to write test data and then read and verify it using a non-Beam client.</p><h3 id=use-fakes>Use fakes</h3><p>Instead of using mocks in your unit tests (pre-programming exact responses to each call for each test), use fakes. The preferred way to use fakes for I/O transform testing is to use a pre-existing in-memory/embeddable version of the service you&rsquo;re testing, but if one does not exist consider implementing your own. Fakes have proven to be the right mix of &ldquo;you can get the conditions for testing you need&rdquo; and &ldquo;you don&rsquo;t have to write a million exacting mock function calls&rdquo;.</p><h3 id=network-failure>Network failure</h3><p>To help with testing and separation of concerns, <strong>code that interacts across a network should be handled in a separate class from your I/O transform</strong>. The suggested design pattern is that your I/O transform throws exceptions once it determines that a read or write is no longer possible.</p><p>This allows the I/O transform&rsquo;s unit tests to act as if they have a perfect network connection, and they do not need to retry/otherwise handle network connection problems.</p><h2 id=batching>Batching</h2><p>If your I/O transform allows batching of reads/writes, you must force the batching to occur in your test. Having configurable batch size options on your I/O transform allows that to happen easily. These must be marked as test only.</p><h2 id=i-o-transform-integration-tests>I/O Transform Integration Tests</h2><blockquote><p>We do not currently have examples of Python I/O integration tests or integration tests for unbounded or eventually consistent data stores. We would welcome contributions in these areas - please contact the Beam dev@ mailing list for more information.</p></blockquote><h3 id=it-goals>Goals</h3><ul><li>Allow end to end testing of interactions between data stores, I/O transforms, and runners, simulating real world conditions.</li><li>Allow both small scale and large scale testing.</li><li>Self contained: require the least possible initial setup or existing outside state, besides the existence of a data store that the test can modify.</li><li>Anyone can run the same set of I/O transform integration tests that Beam runs on its continuous integration servers.</li></ul><h3 id=integration-tests-data-stores-and-kubernetes>Integration tests, data stores, and Kubernetes</h3><p>In order to test I/O transforms in real world conditions, you must connect to a data store instance.</p><p>The Beam community hosts the data stores used for integration tests in Kubernetes. In order for an integration test to be run in Beam&rsquo;s continuous integration environment, it must have Kubernetes scripts that set up an instance of the data store.</p><p>However, when working locally, there is no requirement to use Kubernetes. All of the test infrastructure allows you to pass in connection info, so developers can use their preferred hosting infrastructure for local development.</p><h3 id=running-integration-tests-on-your-machine>Running integration tests on your machine</h3><p>You can always run the IO integration tests on your own machine. The high level steps for running an integration test are:</p><ol><li>Set up the data store corresponding to the test being run.</li><li>Run the test, passing it connection info from the just created data store.</li><li>Clean up the data store.</li></ol><h4 id=datastore-setup-cleanup>Data store setup/cleanup</h4><p>If you&rsquo;re using Kubernetes scripts to host data stores, make sure you can connect to your cluster locally using kubectl. If you have your own data stores already setup, you just need to execute step 3 from below list.</p><ol><li>Set up the data store corresponding to the test you wish to run. You can find Kubernetes scripts for all currently supported data stores in <a href=https://github.com/apache/beam/tree/master/.test-infra/kubernetes>.test-infra/kubernetes</a>.<ol><li>In some cases, there is a dedicated setup script (*.sh). In other cases, you can just run <code>kubectl create -f [scriptname]</code> to create the data store. You can also let <a href=https://github.com/apache/beam/blob/master/.test-infra/kubernetes/kubernetes.sh>kubernetes.sh</a> script perform some standard steps for you.</li><li>Convention dictates there will be:<ol><li>A yml script for the data store itself, plus a <code>NodePort</code> service. The <code>NodePort</code> service opens a port to the data store for anyone who connects to the Kubernetes cluster&rsquo;s machines from within same subnetwork. Such scripts are typically useful when running the scripts on Minikube Kubernetes Engine.</li><li>A separate script, with LoadBalancer service. Such service will expose an <em>external ip</em> for the datastore. Such scripts are needed when external access is required (eg. on Jenkins).</li></ol></li><li>Examples:<ol><li>For JDBC, you can set up Postgres: <code>kubectl create -f .test-infra/kubernetes/postgres/postgres.yml</code></li><li>For Elasticsearch, you can run the setup script: <code>bash .test-infra/kubernetes/elasticsearch/setup.sh</code></li></ol></li></ol></li><li>Determine the IP address of the service:<ol><li>NodePort service: <code>kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}</code></li><li>LoadBalancer service:<code> kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'</code></li></ol></li><li>Run the test using <code>integrationTest</code> gradle task and the instructions in the test class (e.g. see the instructions in JdbcIOIT.java).</li><li>Tell Kubernetes to delete the resources specified in the Kubernetes scripts:<ol><li>JDBC: <code>kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml</code></li><li>Elasticsearch: <code>bash .test-infra/kubernetes/elasticsearch/teardown.sh</code></li></ol></li></ol><h4 id=running-a-test>Running a particular test</h4><p><code>integrationTest</code> is a dedicated gradle task for running IO integration tests.</p><p>Example usage on Cloud Dataflow runner:</p><pre tabindex=0><code>./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions=&#39;[&#34;--project=GOOGLE_CLOUD_PROJECT&#34;, &#34;--tempRoot=GOOGLE_STORAGE_BUCKET&#34;, &#34;--numberOfRecords=1000&#34;, &#34;--postgresPort=5432&#34;, &#34;--postgresServerName=SERVER_NAME&#34;, &#34;--postgresUsername=postgres&#34;, &#34;--postgresPassword=PASSWORD&#34;, &#34;--postgresDatabaseName=postgres&#34;, &#34;--postgresSsl=false&#34;, &#34;--runner=TestDataflowRunner&#34;]&#39; -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
</code></pre><p>Example usage on HDFS filesystem and Direct runner:</p><p>NOTE: Below setup will only work when /etc/hosts file contains entries with hadoop namenode and hadoop datanodes external IPs. Please see explanation in: <a href=https://github.com/apache/beam/blob/master/.test-infra/kubernetes/hadoop/SmallITCluster/hdfs-single-datanode-cluster.yml>Small Cluster config file</a> and <a href=https://github.com/apache/beam/blob/master/.test-infra/kubernetes/hadoop/LargeITCluster/hdfs-multi-datanode-cluster.yml>Large Cluster config file</a>.</p><pre tabindex=0><code>export HADOOP_USER_NAME=root
./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions=&#39;[&#34;--numberOfRecords=1000&#34;, &#34;--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT&#34;, &#34;--hdfsConfiguration=[{\&#34;fs.defaultFS\&#34;:\&#34;hdfs://HDFS_NAMENODE:9000\&#34;,\&#34;dfs.replication\&#34;:1,\&#34;dfs.client.use.datanode.hostname\&#34;:\&#34;true\&#34; }]&#34; ]&#39; -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT
</code></pre><p>Parameter descriptions:</p><table class=table><thead><tr><td><strong>Option</strong></td><td><strong>Function</strong></td></tr></thead><tbody><tr><td>-p sdks/java/io/file-based-io-tests/</td><td>Specifies the project submodule of the I/O to test.</td></tr><tr><td>-DintegrationTestPipelineOptions</td><td>Passes pipeline options directly to the test being run.</td></tr><tr><td>-DintegrationTestRunner</td><td>Runner to be used for running the test. Currently possible options are: direct, dataflow.</td></tr><tr><td>-Dfilesystem</td><td>(optional, where applicable) Filesystem to be used to run the test. Currently possible options are: gcs, hdfs, s3. If not provided, local filesystem will be used.</td></tr><tr><td>--tests</td><td>Specifies the test to be run (fully qualified reference to class/test method).</td></tr></tbody></table><h3 id=running-integration-tests-on-pull-requests>Running Integration Tests on Pull Requests</h3><p>Most of the IO integration tests have dedicated Jenkins jobs that run periodically to collect metrics and avoid regressions. Thanks to <a href=https://github.com/janinko/ghprb>ghprb</a> plugin it is also possible to trigger these jobs on demand once a specific phrase is typed in a Github Pull Request&rsquo;s comment. This way tou can check if your contribution to a certain IO is an improvement or if it makes things worse (hopefully not!).</p><p>To run IO Integration Tests type the following comments in your Pull Request:</p><table class=table><thead><tr><td><strong>Test</strong></td><td><strong>Phrase</strong></td></tr></thead><tbody><tr><td>JdbcIOIT</td><td>Run Java JdbcIO Performance Test</td></tr><tr><td>MongoDBIOIT</td><td>Run Java MongoDBIO Performance Test</td></tr><tr><td>HadoopFormatIOIT</td><td>Run Java HadoopFormatIO Performance Test</td></tr><tr><td>TextIO - local filesystem</td><td>Run Java TextIO Performance Test</td></tr><tr><td>TextIO - HDFS</td><td>Run Java TextIO Performance Test HDFS</td></tr><tr><td>Compressed TextIO - local filesystem</td><td>Run Java CompressedTextIO Performance Test</td></tr><tr><td>Compressed TextIO - HDFS</td><td>Run Java CompressedTextIO Performance Test HDFS</td></tr><tr><td>AvroIO - local filesystem</td><td>Run Java AvroIO Performance Test</td></tr><tr><td>AvroIO - HDFS</td><td>Run Java AvroIO Performance Test HDFS</td></tr><tr><td>TFRecordIO - local filesystem</td><td>Run Java TFRecordIO Performance Test</td></tr><tr><td>ParquetIO - local filesystem</td><td>Run Java ParquetIO Performance Test</td></tr><tr><td>XmlIO - local filesystem</td><td>Run Java XmlIO Performance Test</td></tr><tr><td>XmlIO - HDFS</td><td>Run Java XmlIO Performance Test on HDFS</td></tr></tbody></table><p>Every job definition can be found in <a href=https://github.com/apache/beam/tree/master/.test-infra/jenkins>.test-infra/jenkins</a>.
If you modified/added new Jenkins job definitions in your Pull Request, run the seed job before running the integration test (comment: &ldquo;Run seed job&rdquo;).</p><h3 id=performance-testing-dashboard>Performance testing dashboard</h3><p>As mentioned before, we measure the performance of IOITs by gathering test execution times from Jenkins jobs that run periodically. The consequent results are stored in a database (BigQuery), therefore we can display them in a form of plots.</p><p>The dashboard gathering all the results is available here: <a href="http://metrics.beam.apache.org/d/1/getting-started?orgId=1&amp;viewPanel=123125">Performance Testing Dashboard</a></p><h3 id=implementing-integration-tests>Implementing Integration Tests</h3><p>There are three components necessary to implement an integration test:</p><ul><li><strong>Test code</strong>: the code that does the actual testing: interacting with the I/O transform, reading and writing data, and verifying the data.</li><li><strong>Kubernetes scripts</strong>: a Kubernetes script that sets up the data store that will be used by the test code.</li><li><strong>Jenkins jobs</strong>: a Jenkins Job DSL script that performs all necessary steps for setting up the data sources, running and cleaning up after the test.</li></ul><p>These two pieces are discussed in detail below.</p><h4 id=test-code>Test Code</h4><p>These are the conventions used by integration testing code:</p><ul><li><strong>Your test should use pipeline options to receive connection information.</strong><ul><li>For Java, there is a shared pipeline options object in the io/common directory. This means that if there are two tests for the same data store (e.g. for <code>Elasticsearch</code> and the <code>HadoopFormatIO</code> tests), those tests share the same pipeline options.</li></ul></li><li><strong>Generate test data programmatically and parameterize the amount of data used for testing.</strong><ul><li>For Java, <code>CountingInput</code> + <code>TestRow</code> can be combined to generate deterministic test data at any scale.</li></ul></li><li><strong>Use a write then read style for your tests.</strong><ul><li>In a single <code>Test</code>, run a pipeline to do a write using your I/O transform, then run another pipeline to do a read using your I/O transform.</li><li>The only verification of the data should be the result from the read. Don&rsquo;t validate the data written to the database in any other way.</li><li>Validate the actual contents of all rows in an efficient manner. An easy way to do this is by taking a hash of the rows and combining them. <code>HashingFn</code> can help make this simple, and <code>TestRow</code> has pre-computed hashes.</li><li>For easy debugging, use <code>PAssert</code>&rsquo;s <code>containsInAnyOrder</code> to validate the contents of a subset of all rows.</li></ul></li><li><strong>Tests should assume they may be run multiple times and/or simultaneously on the same database instance.</strong><ul><li>Clean up test data: do this in an <code>@AfterClass</code> to ensure it runs.</li><li>Use unique table names per run (timestamps are an easy way to do this) and per-method where appropriate.</li></ul></li></ul><p>An end to end example of these principles can be found in <a href=https://github.com/ssisk/beam/blob/jdbc-it-perf/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java>JdbcIOIT</a>.</p><h4 id=kubernetes-scripts>Kubernetes scripts</h4><p>As discussed in <a href=#integration-tests-data-stores-and-kubernetes>Integration tests, data stores, and Kubernetes</a>, to have your tests run on Beam&rsquo;s continuous integration server, you&rsquo;ll need to implement a Kubernetes script that creates an instance of your data store.</p><p>If you would like help with this or have other questions, contact the Beam dev@ mailing list and the community may be able to assist you.</p><p>Guidelines for creating a Beam data store Kubernetes script:</p><ol><li><strong>You should define two Kubernetes scripts.</strong><ul><li>This is the best known way to implement item #1.</li><li>The first script will contain the main datastore instance script (<code>StatefulSet</code>) plus a <code>NodePort</code> service exposing the data store. This will be the script run by the Beam Jenkins continuous integration server.</li><li>The second script will define an additional <code>LoadBalancer</code> service, used to expose an external IP address to the data store if the Kubernetes cluster is on another network. This file&rsquo;s name is usually suffixed with &lsquo;-for-local-dev&rsquo;.</li></ul></li><li><strong>You must ensure that pods are recreated after crashes.</strong><ul><li>If you use a <code>pod</code> directly, it will not be recreated if the pod crashes or something causes the cluster to move the container for your pod.</li><li>In most cases, you&rsquo;ll want to use <code>StatefulSet</code> as it supports persistent disks that last between restarts, and having a stable network identifier associated with the pod using a particular persistent disk. <code>Deployment</code> and <code>ReplicaSet</code> are also possibly useful, but likely in fewer scenarios since they do not have those features.</li></ul></li><li><strong>You should create separate scripts for small and large instances of your data store.</strong><ul><li>This seems to be the best way to support having both a small and large data store available for integration testing, as discussed in <a href=#small-scale-and-large-scale-integration-tests>Small Scale and Large Scale Integration Tests</a>.</li></ul></li><li><strong>You must use a Docker image from a trusted source and pin the version of the Docker image.</strong><ul><li>You should prefer images in this order:<ol><li>An image provided by the creator of the data source/sink (if they officially maintain it). For Apache projects, this would be the official Apache repository.</li><li>Official Docker images, because they have security fixes and guaranteed maintenance.</li><li>Non-official Docker images, or images from other providers that have good maintainers (e.g. <a href=https://quay.io/>quay.io</a>).</li></ol></li></ul></li></ol><h4 id=jenkins-jobs>Jenkins jobs</h4><p>You can find examples of existing IOIT jenkins job definitions in <a href=https://github.com/apache/beam/tree/master/.test-infra/jenkins>.test-infra/jenkins</a> directory. Look for files called job_PerformanceTest_*.groovy. The most prominent examples are:</p><ul><li><a href=https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_JDBC.groovy>JDBC</a> IOIT job</li><li><a href=https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_MongoDBIO_IT.groovy>MongoDB</a> IOIT job</li><li><a href=https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_IT.groovy>File-based</a> IOIT jobs</li></ul><p>Notice that there is a utility class helpful in creating the jobs easily without forgetting important steps or repeating code. See <a href=https://github.com/apache/beam/blob/master/.test-infra/jenkins/Kubernetes.groovy>Kubernetes.groovy</a> for more details.</p><h3 id=small-scale-and-large-scale-integration-tests>Small Scale and Large Scale Integration Tests</h3><p>Apache Beam expects that it can run integration tests in multiple configurations:</p><ul><li>Small scale<ul><li>Execute on a single worker on the runner (it should be <em>possible</em> but is not required).</li><li>The data store should be configured to use a single node.</li><li>The dataset can be very small (1000 rows).</li></ul></li><li>Large scale<ul><li>Execute on multiple workers on the runner.</li><li>The datastore should be configured to use multiple nodes.</li><li>The data set used in this case is larger (10s of GBs).</li></ul></li></ul><p>You can do this by:</p><ol><li>Creating two Kubernetes scripts: one for a small instance of the data store, and one for a large instance.</li><li>Having your test take a pipeline option that decides whether to generate a small or large amount of test data (where small and large are sizes appropriate to your data store)</li></ol><p>An example of this is <a href=https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-format>HadoopFormatIO</a>&rsquo;s tests.</p><div class=feedback><p class=update>Last updated on 2024/04/25</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>