blob: fcecbd14ccb2b2dac824519530b1460f122b3d15 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>IO Standards</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/io/io-standards/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/io-standards.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/io-standards.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#overview>Overview</a><ul><li><a href=#what-are-built-in-io-connectors>What are built-in I/O Connectors?</a></li></ul></li></ul><ul><li><a href=#documentation>Documentation</a><ul><li><a href=#built-in-io>Built-in I/O</a></li><li><a href=#io-not-built-in>I/O (not built-in)</a></li><li><a href=#all-sdks>All SDKs</a><ul><li><a href=#pipeline-configuration--execution--streaming--windowing-semantics-guidelines>Pipeline Configuration / Execution / Streaming / Windowing semantics guidelines</a></li></ul></li><li><a href=#java>Java</a><ul><li><a href=#general>General</a></li><li><a href=#classes--methods--properties>Classes / Methods / Properties</a></li><li><a href=#types>Types</a></li><li><a href=#evolution>Evolution</a></li></ul></li><li><a href=#python>Python</a><ul><li><a href=#general-1>General</a></li><li><a href=#classes--methods--properties-1>Classes / Methods / Properties</a></li><li><a href=#types-1>Types</a></li></ul></li><li><a href=#golang>GoLang</a><ul><li><a href=#general-2>General</a></li></ul></li><li><a href=#typescript>Typescript</a><ul><li><a href=#classes--methods--properties-2>Classes / Methods / Properties</a></li></ul></li></ul></li><li><a href=#testing>Testing</a><ul><li><a href=#unit-tests>Unit Tests</a><ul><li><a href=#suggested-test-cases>Suggested Test Cases</a></li></ul></li><li><a href=#integration-tests>Integration Tests</a><ul><li><a href=#suggested-test-cases-1>Suggested Test Cases</a></li></ul></li><li><a href=#performance-tests>Performance Tests</a><ul><li><a href=#dashboard>Dashboard</a></li><li><a href=#guidance-1>Guidance</a></li></ul></li></ul></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=io-standards>I/O Standards</h1><h2 id=overview>Overview</h2><p>This Apache Beam I/O Standards document lays out the prescriptive guidance for 1P/3P developers developing an Apache Beam I/O connector. These guidelines aim to create best practices encompassing documentation, development and testing in a simple and concise manner.</p><h3 id=what-are-built-in-io-connectors>What are built-in I/O Connectors?</h3><p>An I/O connector (I/O) living in the Apache Beam Github repository is known as a <strong>Built-in I/O connector</strong>. Built-in I/O’s have their <a href=#integration-tests>integration tests</a> and performance tests routinely run by the Google Cloud Dataflow Team using the Dataflow Runner and metrics published publicly for <a href=#dashboard>reference</a>. Otherwise, the following guidelines will apply to both unless explicitly stated.</p><h1 id=guidance>Guidance</h1><h2 id=documentation>Documentation</h2><p>This section lays out the superset of all documentation that is expected to be made available with an I/O. The Apache Beam documentation referenced throughout this section can be found <a href=/documentation/>here</a>. And generally a good example to follow would be the built-in I/O, <a href=/documentation/io/built-in/snowflake/>Snowflake I/O</a>.</p><h3 id=built-in-io>Built-in I/O</h3><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>Provided code docs for the relevant language of the I/O. This should also have links to any external sources of information within the Apache Beam site or external documentation location.<p>Examples:<ul><li><a href=https://beam.apache.org/releases/javadoc/current/overview-summary.html>Java doc</a><li><a href=https://beam.apache.org/releases/pydoc/current/>Python doc</a><li><a href=https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam>Go doc</a></ul></td></tr><tr><td><p>Add a new page under <strong>I/O connector guides</strong> that covers specific tips and configurations. The following shows those for <a href=/documentation/io/built-in/parquet/>Parquet</a>, <a href=/documentation/io/built-in/hadoop/>Hadoop</a> and others.<p>Examples:<p><img src=/images/io-standards/io-connector-guides-screenshot.png width alt="I/O connector guides screenshot" title="I/O connector guides screenshot"></img></td></tr><tr><td><p>Formatting of the section headers in your Javadoc/Pythondoc should be consistent throughout such that programmatic information extraction for other pages can be enabled in the future.<p>Example <strong>subset</strong> of sections to include in your page in order:<ol><li><a href=#before-you-start>Before you start</a><li>{Connector}IO basics<li>Supported Features<ol><li><a href=#relational>Relational</a></li></ol><li><a href=#authentication>Authentication</a><li>Reading from {Connector}<li>Writing to {Connector}<li><a href=#resource-scalability>Resource scalability</a><li><a href=#limitations>Limitations</a><li>Reporting an Issue</li></ol><p>Example:<p>The KafkaIO <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.html>JavaDoc</a></td></tr><tr id=relational><td><p>I/O Connectors should include a table under <strong>Supported Features</strong> subheader that indicates the <a href=https://2022.beamsummit.org/sessions/relational-beam/>Relational Features</a> utilized.<p>Relational Features are concepts that can help improve efficiency and can optionally be implemented by an I/O Connector. Using end user supplied pipeline configuration (<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/io/SchemaIO.html>SchemaIO</a>) and user query (<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/FieldAccessDescriptor.html>FieldAccessDescriptor</a>) data, relational theory is applied to derive improvements such as faster pipeline execution, lower operation costs and less data read/written.<p>Example table:<p><img src=/images/io-standards/io-connector-relational-features-table.png width alt="I/O connector guides screenshot" title="I/O connector guides screenshot"></img><p><div class='language-markdown snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-markdown data-lang=markdown><span class=line><span class=cl><span class=p>&lt;</span><span class=nt>div</span> <span class=na>class</span><span class=o>=</span><span class=s>&#34;table-container-wrapper&#34;</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl><span class=p>&lt;</span><span class=nt>table</span> <span class=na>class</span><span class=o>=</span><span class=s>&#34;table table-bordered table-io-standards-relational-features&#34;</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>th</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;&lt;</span><span class=nt>strong</span><span class=p>&gt;</span>Relational Feature<span class=p>&lt;/</span><span class=nt>strong</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>th</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>th</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;&lt;</span><span class=nt>strong</span><span class=p>&gt;</span>Supported<span class=p>&lt;/</span><span class=nt>strong</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>th</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>th</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;&lt;</span><span class=nt>strong</span><span class=p>&gt;</span>Notes<span class=p>&lt;/</span><span class=nt>strong</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>th</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Column Pruning
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Yes/No
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>To Be Filled
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Filter Pushdown
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Yes/No
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>To Be Filled
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Table Statistics
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Yes/No
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>To Be Filled
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Partition Metadata
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Yes/No
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>To Be Filled
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Metastore
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>Yes/No
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;</span><span class=nt>p</span><span class=p>&gt;</span>To Be Filled
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>td</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl> <span class=p>&lt;/</span><span class=nt>tr</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl><span class=p>&lt;/</span><span class=nt>table</span><span class=p>&gt;</span>
</span></span><span class=line><span class=cl><span class=p>&lt;/</span><span class=nt>div</span><span class=p>&gt;</span></span></span></code></pre></div></div></div></p><p>Example implementations:<p>BigQueryIO <a href=https://github.com/apache/beam/blob/5bb13fa35b9bc36764895c57f23d3890f0f1b567/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1813>Column Pruning</a> via ProjectionPushdown to return only necessary columns indicated by an end user's query. This is achieved using BigQuery DirectRead API.</td></tr><tr><td><p>Add a page under <strong>Common pipeline patterns</strong>, if necessary, outlining common usage patterns involving your I/O.<p><a href=/documentation/patterns/bigqueryio/>https://beam.apache.org/documentation/patterns/bigqueryio/</a></td></tr><tr><td><p>Update <strong>I/O Connectors</strong> with your I/O’s information<p>Example:<p><a href=/documentation/io/connectors/#built-in-io-connectors>https://beam.apache.org/documentation/io/connectors/#built-in-io-connectors</a><p><img src=/images/io-standards/io-supported-via-screenshot.png width alt=alt_text title=image_tooltip></td></tr><tr id=before-you-start><td><p>Provide setup steps to use the I/O, under a <strong>Before you start</strong> header.<p>Example:<p><a href=/documentation/io/built-in/parquet/#before-you-start>https://beam.apache.org/documentation/io/built-in/parquet/#before-you-start</a></td></tr><tr><td><p>Include a canonical read/write code snippet after the initial description for each supported language. The below example shows Hadoop with examples for Java.<p>Example:<p><a href=/documentation/io/built-in/hadoop/#reading-using-hadoopformatio>https://beam.apache.org/documentation/io/built-in/hadoop/#reading-using-hadoopformation</a></td></tr><tr><td><p>Indicate how timestamps for elements are assigned. This includes batch sources to allow for future I/Os which may provide more useful information than <code>current_time()</code>.<p>Example:</td></tr><tr><td><p>Indicate how timestamps are advanced; for Batch sources this will be marked as n/a in most cases.</td></tr><tr><td><p>Outline any temporary resources (for example, files) that the connector will create.<p>Example:<p>BigQuery batch loads first create a temp GCS location<p><a href=https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L455>https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L455</a></td></tr><tr id=authentication><td><p>Provide, under an <strong>Authentication</strong> subheader, how to acquire partner authorization material to securely access the source/sink.<p>Example:<p><a href=/documentation/io/built-in/snowflake/#authentication>https://beam.apache.org/documentation/io/built-in/snowflake/#authentication</a><p>Here BigQuery names it permissions but the topic covers similarities<p><a href=https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html>https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html</a></td></tr><tr><td><p>I/Os should provide links to the Source/Sink documentation within <strong>Before you start</strong> header.<p>Example:<p><a href=/documentation/io/built-in/snowflake/>https://beam.apache.org/documentation/io/built-in/snowflake/</a></td></tr><tr><td><p>Indicate if there is native or X-language support in each language with a link to the docs.<p>Example:<p>Kinesis I/O has a native implementation of java and X-language support for python but no support for Golang.</td></tr><tr id=limitations><td><p>Indicate known limitations under a <strong>Limitations</strong> header. If the limitation has a tracking issue, please link it inline.<p>Example:<p><a href=/documentation/io/built-in/snowflake/#limitations>https://beam.apache.org/documentation/io/built-in/snowflake/#limitations</a></td></tr></table></div><h3 id=io-not-built-in>I/O (not built-in)</h3><p>Custom I/Os are not included in the Apache Beam Github repository. Some examples would be <a href=https://github.com/SolaceProducts/solace-apache-beam>Solace</a>IO.</p><div class=table-container-wrapper><table class="table table-bordered table-connectors"><tr><td><p>Update the Other I/O Connectors for Apache Beam table with your information.<p><a href=/documentation/io/connectors/#other-io-connectors-for-apache-beam>The aformentioned table</a></td></tr></table></div>## Development<p>This section outlines API syntax, semantics and recommendations for features that should be adopted for new as well as existing Apache Beam I/O Connectors.</p><p>The I/O Connector development guidelines are written with the following principles in mind:</p><ul><li>Consistency makes an API easier to learn.<ul><li>If there are multiple ways of doing something, we should strive to be consistent first</li></ul></li><li>With a couple minutes of studying documentation, users should be able to pick up most I/O connectors.</li><li>The design of a new I/O should consider the possibility of evolution.</li><li>Transforms should integrate well with other Beam utilities.</li></ul><h3 id=all-sdks>All SDKs</h3><h4 id=pipeline-configuration--execution--streaming--windowing-semantics-guidelines>Pipeline Configuration / Execution / Streaming / Windowing semantics guidelines</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Topic</th><th><p>Semantics</th></tr><tr><td><p>Pipeline Options</td><td><p>An I/O should rarely rely on a PipelineOptions subclass to tune internal parameters.<p>If neccesary, a connector-related pipeline options class should:<ul><li>Document clearly, for each option, the effect it has and why one may modify it.<li>Option names must be namespaced to avoid collisions<li>Class Name: <code>{Connector}Options</code><li>Method names: <code>set{Connector}{Option}</code>, <code>get{Connector}{Option}</code></li></td></tr><tr><td><p>Source Windowing</td><td><p>A source must return elements in the GlobalWindow unless explicitly parameterized in the API by the user.<p>Allowable Non-global-window patterns:<ul><li>ReadFromIO(window_by=...)<li>ReadFromIO.IntoFixedWindows(...)<li>ReadFromIO(apply_windowing=True/False) (e.g. <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.periodicsequence.html#apache_beam.transforms.periodicsequence.PeriodicImpulse>PeriodicImpulse</a>)<li>IO.read().withWindowing(...)<li>IO.read().windowBy(...)<li>IO.read().withFixedWindows(...)</li></ul></td></tr><tr><td><p>Sink Windowing</td><td><p>A sink should be Window agnostic and handle elements sent with any Windowing method, unless explicitly parameterized or expressed in its API.<p>A sink may change windowing of a PCollection internally in any way. However, the metadata that it returns as part of its result object must be:<ul><li>must be in the same window as the input, unless explicitly decared otherwise in the API.<li>must have accurate timestamps<li>may contain additional information about windowing (e.g. a BigQuery job may have a timestamp, but also a window associated with it).</ul><p>Allowable non-global-window patterns:<ul><li>WriteToIO(triggering_frequency=...) - e.g. <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery>WriteToBigQuery</a> (This only sets the windowing within the transform - input data is still in the Global Window).<li>WriteBatchesToIO(...)<li>WriteWindowsToIO(...)</li></ul></td></tr><tr><td><p>Throttling</td><td><p>A streaming sink (or any transform accessing an external service) may implement throttling of its requests to prevent from overloading the external service.<p>TODO: Beam should expose throttling utilities (<a href=https://github.com/apache/beam/issues/24743>Tracking Issue</a>):<ul><li>Per-key fixed throttling<li>Adaptive throttling with sink-reported backpressure<li>Ramp-up throttling from a start point</li></ul></td></tr><tr><td><p>Error handling</td><td><p>TODO: <a href=https://github.com/apache/beam/issues/24742>Tracking Issue</a></td></tr></table></div><h3 id=java>Java</h3><h4 id=general>General</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>The primary class used in working with the connector should be named <strong>{connector}IO</strong><p>Example:<p>The BigQuery I/O is <strong>org.apache.beam.sdk.io.bigquery.BigQueryIO</strong></td></tr><tr><td><p>The class should be placed in the package <strong>org.apache.beam.sdk.io.{connector}</strong><p>Example:<p>The BigQueryIO belongs in the java package <a href=https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java>org.apache.beam.sdk.io.bigquery</a></td></tr><tr><td><p>The unit/integration/performance tests should live under the package <strong>org.apache.beam.sdk.io.{connector}.testing</strong>. This will cause the various tests to work with the standard user-facing interfaces of the connector.<p>Unit tests should reside in the same package (i.e. <strong>org.apache.beam.sdk.io.{connector}</strong>), as they may often test internals of the connector.<p>The BigQueryIO belongs in the java package <a href=https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java>org.apache.beam.sdk.io.bigquery</a></td></tr><tr><td><p>An I/O transform should avoid receiving user lambdas to map elements from a user type to a connector-specific type. Instead, they should interface with a connector-specific data type (with schema information when possible).<p>When necessary, an I/O transform should receive a type parameter that specifies the input type (for sinks) or output type (for sources) of the transform.<p>An I/O transform may not have a type parameter <strong>only if it is certain that its output type will not change</strong> (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/FileIO.MatchAll.html>FileIO.MatchAll</a> and other <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/FileIO.html>FileIO transforms</a>).</td></tr><tr><td><p>It is highly discouraged to directly expose third-party libraries in the public API part of the I/O Connector for the following reasons:<ul><li>It reduces Apache Beam’s compatibility guarantees - Changes to third-party libraries can/will directly break existing user’s pipelines.<li>It makes code maintainability hard - If libraries are directly exposed at API level, a dependency change will require multiple changes throughout the I/O implementation code<li>It forces third-party dependencies onto end users</li></ul><p>Instead, we highly recommend exposing Beam-native interfaces and an adaptor that holds mapping logic.<p>If you believe that the library in question is extremely static in nature. Please note it in the I/O itself.</td></tr><tr><td><p>Source and Sinks should be abstracted with a PTransform wrapper, and internal classes be declared protected or private. By doing so implementation details can be added/changed/modified without breaking implementation by dependencies.</td></tr></table></div><h4 id=classes--methods--properties>Classes / Methods / Properties</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Java Syntax</th><th><p>Semantics</th></tr><tr><td><p>class IO.Read</td><td><p>Gives access to the class that represents reads within the I/O. The <code>Read</code> class should implement a fluent interface similar to the fluentbuilder pattern (e.g. <code>withX(...).withY(...)</code>). Together with default values, it provide a fail-fast (with immediate validation feedback after each <code>.withX()</code>) that is slightly less verbose than the builder pattern.<p>A user should <strong>not</strong> create this class directly. It should be created by a <a href=#static-method-read>top-level utility method</a>.</td></tr><tr><td><p>class IO.ReadAll</td><td><p>A few different sources implement runtime configuration for reading from a data source. This is a valuable pattern because it enables a purely batch source to become a more sophisticated streaming source.<p>As much as possible, this type of transform should have the type richness of a construction-time-configured transform:<ul><li>Support Beam Row output with a schema known at construction-time.<li>Extra configuration may be needed (and acceptable) in this case (e.g. a SchemaProvider parameter, a Schema parameter, a Schema Catalog or a utility of that sort).<li>The input PCollection should have a fixed type with a schema, so it can be easily manipulated by users.</li></ul><p>Example:<p><a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/jdbc/JdbcIO.ReadAll.html>JdbcIO.ReadAll</a>, <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/parquet/ParquetIO.ReadFiles.html>ParquetIO.ReadFiles</a></td></tr><tr><td><p>class IO.Write</td><td><p>Gives access to the class that represents writes within the I/O. The Write class should implement a fluent interface pattern (e.g. <code>withX(...).withY(...)</code>) as described further above for <code>IO.Read</code>.<p>A user should not create this class directly. It should be created by a <a href=#static-method-write>top-level utility method</a>.</td></tr><tr><td><p>Other Transform Classes</td><td><p>Some data storage and external systems implement APIs that do not adjust easily to Read or Write semantics (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.html>FhirIO implements several different transforms</a> that fetch or send data to Fhir).<p>These classes should be added <strong>only if it is impossible or prohibitively difficult to encapsulate their functionality as part of extra configuration of Read, Write and ReadAll</strong> transforms, to avoid increasing the cognitive load on users.<p>A user should not create these classes directly. They should be created by a <a href=#static-method-read>top-level static method</a>.</td></tr><tr><td><p>Utility Classes</td><td><p>Some connectors rely on other user-facing classes to set configuration parameters.<p>(e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/jdbc/JdbcIO.DataSourceConfiguration.html>JdbcIO.DataSourceConfiguration</a>). These classes should be <strong>nested within the {Connector}IO class</strong>.<p>This format makes them visible in the main Javadoc, and easy to discover by users.</td></tr><tr id=static-method-write><td><p>Method IO&lt;T>.write()</td><td><p>The top-level I/O class will provide a <strong>static method</strong> to start constructing an I/O.Write transform. This returns a PTransform with a single input PCollection, and a Write.Result output.<p>This method should not specify in its name any of the following:<ul><li>Internal data format<li>Strategy used to write data<li>Input or output data type</li></ul><p>The above should be specified via configuration parameters if possible. <strong>If impossible</strong>, then <strong>a new static method</strong> may be introduced, but this <strong>must be exceptional</strong>.</td></tr><tr id=static-method-read><td><p>Method IO&lt;T>.read()</td><td><p>The method to start constructing an I/O.Read transform. This returns a PTransform with a single output PCollection.<p>This method should not specify in its name any of the following:<ul><li>Internal data format<li>Strategy used to read data<li>Output data type</li></ul><p>The above should be specified via configuration parameters if possible. <strong>If not possible</strong>, then <strong>a new static method</strong> may be introduced, but this <strong>must be exceptional, and documented in the I/O header as part of the API</strong>.<p>The initial static constructor method may receive parameters if these are few and general, or if they are necessary to configure the transform (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.html#exportResourcesToGcs-java.lang.String-java.lang.String->FhirIO.exportResourcesToGcs</a>, <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor->JdbcIO.ReadWithPartitions</a> needs a TypeDescriptor for initial configuration).</td></tr><tr id=read-from-source><td><p>IO.Read.from(source)</td><td><p>A Read transform must provide a <strong>from</strong> method where users can specify where to read from. If a transform can read from different <em>kinds</em> of sources (e.g. tables, queries, topics, partitions), then multiple implementations of this from method can be provided to accommodate this:<ul><li>IO.Read from(Query query)<li>IO.Read from(Table table) / from(String table)<li>IO.Read from (Topic topic)<li>IO.Read from(Partition partition)</li></ul><p>The input type for these methods can reflect the external source’s API (e.g. <a href=https://kafka.apache.org/27/javadoc/?org/apache/kafka/common/TopicPartition.html>Kafka TopicPartition</a> should use a <strong>Beam-implemented</strong> TopicPartition object).<p>Sometimes, there may be multiple <strong>from</strong> locations that use the same input type, which means we cannot leverage method overloading. With this in mind, use a new method to enable this situation.<ul><li>IO.Read from(String table)<li>IO.Read fromQuery(String query)</li></ul></td></tr><tr><td><p>IO.Read.fromABC(String abc)</td><td><p><string>This pattern is discouraged if method overloading is possible</strong>, follow guidance in <strong><a href=#read-from-source>Read.from(source)</a></strong>.</td></tr><tr id=write-to-destination><td><p>IO.Write.to(destination)</td><td><p>A Write transform must provide a <strong>to</strong> method where users can specify where to write data. If a transform can write to different <em>kinds</em> of sources while still using the same input element type(e.g. tables, queries, topics, partitions), then multiple implementations of this from method can be provided to accommodate this:<ul><li>IO.Write to(Query query)<li>IO.Write to(Table table) / from(String table)<li>IO.Write to(Topic topic)<li>IO.Write to(Partition partition)</li></ul><p>The input type for these methods can reflect the external sink's API (e.g. <a href=https://kafka.apache.org/27/javadoc/?org/apache/kafka/common/TopicPartition.html>Kafka TopicPartition</a> should use a <strong>Beam-implemented</strong> TopicPartition object).<p>If different kinds of destinations require different types of input object types, then these should be done in separate I/O connectors.<p>Sometimes, there may be multiple <strong>from</strong> locations that use the same input type, which means we cannot leverage method overloading. With this in mind, use a new method to enable this situation.<ul><li>IO.Write to(String table)<li>IO.Write toTable(String table)</li></ul></td></tr><tr><td><p>IO.Write.to(DynamicDestination destination)</td><td><p>A write transform may enable writing to more than one destination. This can be a complicated pattern that should be implemented carefully (it is the preferred pattern for connectors that will likely have multiple destinations in a single pipeline).<p>The preferred pattern for this is to define a DynamicDestinations interface (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.html>BigQueryIO.DynamicDestinations</a>) that will allow the user to define all necessary parameters for the configuration of the destination.<p>The DynamicDestinations interface also allows maintainers to add new methods over time (with <strong>default implementations</strong> to avoid breaking existing users) that will define extra configuration parameters if necessary.</td></tr><tr><td><p>IO.Write.toABC(destination)</td><td><p><string>This pattern is discouraged if method overloading is possible</strong>, follow guidance in <strong><a href=#write-to-destination>Write.to(destination)</a></strong>.</td></tr><tr><td><p>class IO.Read.withX<p>IO.Write.withX</td><td><p>withX provides a method for configuration to be passed to the Read method, where X represents the configuration to be created. With the exception of generic with statements ( defined below ) the I/O should attempt to match the name of the configuration option with that of the option name in the source.<p>These methods should return a new instance of the I/O rather than modifying the existing instance.<p>Example:<p><a href=https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/TextIO.Read.html#withCompression-org.apache.beam.sdk.io.Compression->TextIO.Read.withCompression</a></td></tr><tr><td><p>IO.Read.withConfigObject<p>IO.Write.withConfigObject</td><td><p>Some connectors in Java receive a configuration object as part of their configuration. <strong>This pattern is encouraged only for particular cases</strong>. In most cases, a connector can hold all necessary configuration parameters at the top level.<p>To determine whether a multi-parameter configuration object is an appropriate parameter for a high level transform, the configuration object must:<ul><li>Hold only properties related to the connection/authentication parameters for the external data store (e.g. JdbcIO.DataSourceConfiguration).<ul><li>Generally, <strong>secrets should not be passed as parameters</strong>, unless an alternative is not feasible. For secret management, a secret-management service or KMS is the recommended approach.</ul><li><strong>Or </strong>mirror an API characteristic from the external data source (e.g. KafkaIO.Read.withConsumerConfigUpdates), without exposing that external API in the Beam API.<ul><li>The method should mirror the name of the API object (e.g. given an object SubscriptionStatConfig, a method would be withSubscriptionStatConfig).</ul><li><strong>Or</strong> when a connector can support different configuration ‘paths’ where a particular property requires other properties to be specified (e.g. BigQueryIO’s method will entail various different properties). (see last examples).</li></ul><p>Example:<p><a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/jdbc/JdbcIO.DataSourceConfiguration.html>JdbcIO.DataSourceConfiguration</a>, <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.html>SpannerConfig</a>, <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#withConsumerConfigUpdates-java.util.Map->KafkaIO.Read.withConsumerConfigUpdates</a><p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre tabindex=0><code>BigQueryIO.write()
.withWriteConfig(FileLoadsConfig.withAvro()
.withTriggeringFrequency()...)
BigQueryIO.write()
.withWriteConfig(StreamingInsertsConfig.withDetailedError()
.withExactlyOnce().etc..)</code></pre></div></div></p></td></tr><tr><td><p>class IO.Write.withFormatFunction</td><td><p><strong>Discouraged - except for dynamic destinations</strong><p>For sources that can receive Beam Row-typed PCollections, the format function should not be necessary, because Beam should be able to format the input data based on its schema.<p>For sinks providing Dynamic Destination functionality, elements may carry data that helps determine their destination. These data may need to be removed before writing to their final destination.<p>To include this method, a connector should:<ul><li>Show that it’s not possible to perform data matching automatically.<li>Support Dynamic Destinations and need changes to the input data due to that reason.</li></ul></td></tr><tr><td><p>IO.Read.withCoder<p>IO.Write.withCoder</td><td><p><strong>Strongly Discouraged</strong><p>Sets the coder to use to encode/decode the element type of the output / input PCollection of this connector. In general, it is recommended that sources will:<ol><li>Return Row objects with a schema that is automatically inferred.<li>Automatically set the necessary coder by having fixed output/input types, or inferring their output/input types.</li></ol><p>If nether #1 and #2 are possible, then a <code>withCoder(...)</code> method can be added.</td></tr><tr><td><p>IO.ABC.withEndpoint / with{IO}Client / withClient</td><td><p>Connector transforms should provide a method to override the interface between themselves and the external system that they communicate with. This can enable various uses:<p>Sets the coder to use to encode/decode the element type of the output / input PCollection of this connector. In general, it is recommended that sources will:<ul><li>Local testing by mocking the destination service<li>User-enabled metrics, monitoring, and security handling in the client.<li>Integration testing based on emulators</li></ul><p>Example:<p><a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTestServices-org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices->BigQueryIO.Write.withTestServices</a>(<a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.html>BigQueryServices</a>)</td></tr></table></div><h4 id=types>Types</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Java Syntax</th><th><p>Semantics</th></tr><tr><td><p>Method IO.Read.expand</td><td><p>The expand method of a Read transform must return a PCollection object with a type. The type may be parameterized or fixed to a class.<p>A user should <strong>not</strong> create this class directly. It should be created by a <a href=#static-method-read>top-level utility method</a>.</td></tr><tr><td><p>Method IO.Read.expand’s PCollection type</td><td><p>The type of the PCollection will usually be one of the following four options. For each of these option, the encoding / data is recommended to be as follows:<ul><li>A pre-defined, basic Java type (e.g. String)<ul><li>This encoding should be simple, and use a simple Beam coder (e.g. Utf8StringCoder)</li></ul><li>A pre-set POJO type (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/fs/MatchResult.Metadata.html>Metadata</a>) with a schema<ul><li>The preferred strategy for these is to define the output type as an <a href=https://stackoverflow.com/questions/62546191/how-do-i-use-an-autovalue-data-type-for-my-pcollection-in-apache-beam>@AutoValue, with @DefaultSchema and @SchemaCreate</a> annotations. This will ensure compact, fast encoding with RowCoder.</li></ul><li>A Beam Row with a specific schema<li>A type with a schema that’s not known at construction time</li></ul><p>In all cases, asking a user to pass a coder (e.g. <code>withCoder(...)</code>) is <strong>discouraged</strong>.</td></tr><tr><td><p>method IO.Write.expand</td><td><p>The expand method of any write transform must return a type IO.Write.Result object that extends a PCollectionTuple. This object allows transforms to return metadata about the results of its writing and allows this write to be followed by other PTransforms.<p>If the Write transform would not need to return any metadata, a Write.Result object <strong>is still preferable</strong>, because it will allow the transform to evolve its metadata over time.<p>Examples of metadata:<ul><li>Failed elements and errors<li>Successfully written elements<li>API tokens from calls issued by the transform</li></ul><p>Examples:<p>BigQueryIO’s <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html>WriteResult</a></td></tr></table></div><h4 id=evolution>Evolution</h4><p>Over time, I/O need to evolve to address new use cases, or use new APIs under the covers. Some examples of necessary evolution of an I/O:</p><ul><li>A new data type needs to be supported within it (e.g. <a href=https://github.com/apache/beam/pull/15848>any-type partitioning in JdbcIO.ReadWithPartitions</a>)</li><li>A new backend API needs to be supported</li></ul><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Java Syntax</th><th><p>semantics</th></tr><tr><td><p>Top-level static methods</td><td><p>In general, one should resist adding a completely new static method for functionality that can be captured as configuration within an existing method.<p>An example of too many top-level methods that could be supported via configuration is <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html>PubsubIO</a><p>A new top-level static method should only be added in the following cases:<ul><li>A new PTransform is being added that cannot / does not make sense to be supported as a simple configuration parameter of the existing PTransforms (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.html>FhirIO’s various transforms</a>, any ReadAll transform).<li>A feature that represents a new recommended standard to use the transform (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/jdbc/JdbcIO.html>JdbcIO’s ReadWithPartitions</a>)<li>A change in the usual way of interacting with this transform, or a parameter that’s necessary to initialize the transform (e.g. <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html>BigQueryIO.read(...) method vs read()</a>)</li></ul></td></tr></table></div><h3 id=python>Python</h3><h4 id=general-1>General</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>If the I/O lives in Apache Beam it should be placed in the package <strong>apache_beam.io.{connector}</strong> or <strong>apache_beam.io.{namespace}.{connector}</strong><p>Example:<p>apache_beam.io.fileio and apache_beam.io.gcp.bigquery</td></tr><tr><td><p>There will be a module named {connector}.py which is the primary entry point used in working with the connector in a pipeline <strong>apache_beam.io.{connector}</strong> or <strong>apache_beam.io.{namespace}.{connector}</strong><p>Example:<p>apache_beam.io.gcp.bigquery / apache_beam/io/gcp/bigquery.py<p>Another possible layout: apache_beam/io/gcp/bigquery/bigquery.py (automatically import public classes in bigquery/__init__.py)</td></tr><tr><td><p>The connector must define an <code>__all__</code> attribute in its main file, and export only classes and methods meant to be accessed by users.</td></tr><tr><td><p>If the I/O implementation exists in a single module (a single file), then the file {connector}.py can hold it.<p>Otherwise, the connector code should be defined within a directory (connector package) with an __init__.py file that documents the public API.<p>If the connector defines other files containing utilities for its implementation, these files must clearly document the fact that they are not meant to be a public interface.</td></tr></table></div><h4 id=classes--methods--properties-1>Classes / Methods / Properties</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Python Syntax</th><th><p>semantics</th></tr><tr><td><p>callable ReadFrom{Connector}</td><td><p>This gives access to the PTransform to read from a given data source. It allows you to configure it via the arguments that it receives. For long lists of optional parameters, they may be defined as parameters with a default value.<p>Q. Java uses a builder pattern. Why can’t we do that in Python? <a href=https://stackoverflow.com/a/11977454/1255356>Optional parameters can serve the same role</a> in Python.<p>Example:<p><a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html>apache_beam.io.gcp.bigquery.ReadFromBigQuery</a></td></tr><tr><td><p>callable ReadAllFrom{Connector}</td><td>A few different sources implement runtime configuration for reading from a data source. This is a valuable pattern because it enables a purely batch source to become a more sophisticated streaming source.<p>As much as possible, this type of transform should have the type richness and safety of a construction-time-configured transform:<ul><li>Support output with a schema known at construction-time<ul><li>Extra configuration may be needed (and acceptable) in this case (e.g. a SchemaProvider parameter, a Schema parameter, a Schema Catalog or a utility of that sort).</li></ul><li>The input PCollection should have a fixed type with a schema, so it can be easily manipulated by users.</li></ul><p>Example:<p><a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#readallfrombigquery>ReadAllFromBigQuery</a></td></tr><tr><td><p>callable WriteTo{Connector}</td><td><p>This gives access to the PTransform to write into a given data sink. It allows you to configure it via the arguments that it receives. For long lists of optional parameters, they may be defined as parameters with a default value.<p>Q. Java uses a builder pattern. Why can’t we do that in Python? <a href=https://stackoverflow.com/a/11977454/1255356>Optional parameters can serve the same</a><p><a href=https://stackoverflow.com/a/11977454/1255356>role</a> in Python.<p>Example:<p><a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html>apache_beam.io.gcp.bigquery.WriteToBigQuery</a></td></tr><tr><td><p>callables Read/Write</td><td><p>A top-level transform initializer (ReadFromIO/ReadAllFromIO/WriteToIO) must aim to require the fewest possible parameters, to simplify its usage, and allow users to use them quickly.</td></tr><tr><td><p>parameter ReadFrom{Connector}({source})<p>parameter WriteTo{Connector}({sink})</td><td><p>The first parameter in a Read or Write I/O connector must specify the source for readers or the destination for writers.<p>If a transform can read from different <em>kinds</em> of sources (e.g. tables, queries, topics, partitions), then the suggested approaches by order of preference are:<ol><li>Retain a single argument, but auto-infer the source/sink type (e.g. <a href=https://pandas.pydata.org/docs/reference/api/pandas.read_sql.html#pandas-read-sql><code>pandas.read_sql(...)</code></a> supporting tables and queries)<li>Add a new argument for each possible source/sink type (e.g. ReadFromBigQuery having query/table parameters)</li></ol></td></tr><tr><td><p>parameter WriteToIO(destination={multiple_destinations})</td><td><p>A write transform may enable writing to more than one destination. This can be a complicated pattern that should be implemented carefully (it is the preferred pattern for connectors that will likely have multiple destinations in a single pipeline).<p>The preferred API pattern in Python is to pass callables (e.g. <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery>WriteToBigQuery</a>) for all parameters that will need to be configured. In general, examples of callable parameters may be:<ul><li>Destination callable → Should receive an element, and return a destination for that element<li>Other examples<ul><li>Schema callable → Should receive a destination and return a schema for the destination<li>Format function → Should receive a record (and maybe a destination) and format the record to be inserted.</li></ul></li></ul><p>Using these callables also allows maintainers to add new parameterizable callables over time (with <strong>default values</strong> to avoid breaking existing users) that will define extra configuration parameters if necessary.<p><strong>Corner case</strong>: It is often necessary to pass side inputs to some of these callables. The recommended pattern is to have an extra parameter in the constructor to include these side inputs (e.g. <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery>WriteToBigQuery’s table_side_inputs parameter</a>)</td></tr><tr><td><p>parameter ReadFromIO(param={param_val})<p>parameter WriteToIO(param={param_val})</td><td><p>Any additional configuration can be added as optional parameters in the constructor of the I/O. Whenever possible, mandatory extra parameters should be avoided. Optional parameters should have reasonable default values, so that picking up a new connector will be as easy as possible.</td></tr><tr><td><p>parameter ReadFromIO(config={config_object})</td><td><p><strong>Discouraged</strong><p>Some connectors in Python may receive a complex configuration object as part of their configuration. <strong>This pattern is discouraged</strong>, because a connector can hold all necessary configuration parameters at the top level.<p>To determine whether a multi-parameter configuration object is an appropriate parameter for a high level transform, the configuration object must:<ul><li>Hold only properties related to the connection/authentication parameters for the external data store (e.g. <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html#apache_beam.io.kafka.ReadFromKafkaSchema>ReadFromKafka(consumer_config parameter)</a>)<li><strong>Or </strong>mirror an API characteristic from the external data source (e.g. <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.io.debezium.html>ReadFromDebezium(connector_properties parameter)</a>)</li></ul></td></tr></table></div><h4 id=types-1>Types</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Python Syntax</th><th><p>Semantics</th></tr><tr><td><p>Output of method ReadFromIO.expand</td><td><p>The expand method of a Read transform must return a PCollection object with a type, and be annotated with the type. Preferred PCollection types in Python are (in order of preference):<p>Simple Python types if the (bytes, str, numbers)<p>For complex types:<ol><li>A NamedTuple or DataClass with a set schema, encoded with RowCoder<li>A Python dictionary<ol><li>The dictionaries should be encoded via RowCoder, if possible.</li></ol><li>A preset Python class, if a schema is not possible</li></ol></td></tr><tr><td><p>Output of method WriteToIO.expand</td><td><p>The expand method of any write transform must return a Python object with a <strong>fixed class type</strong>. The recommended name for the class is <strong>WriteTo{IO}Result</strong>. This object allows transforms to return metadata about the results of its writing.<p>If the Write transform would not need to return any metadata, a Python object with a class type <strong>is still preferable</strong>, because it will allow the transform to evolve its metadata over time.<p>Examples of metadata:<ul><li>Failed elements and errors<li>Successfully written elements<li>API tokens from calls issued by the transform</li></ul><p>Example:<p>BigQueryIO’s <a href=https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html>WriteResult</a><p>Motivating example (a bad pattern): WriteToBigQuery’s inconsistent dictionary results[<a href=https://github.com/apache/beam/blob/v2.34.0/sdks/python/apache_beam/io/gcp/bigquery.py#L2138>1</a>][<a href=https://github.com/apache/beam/blob/b3b184361100492f92fcc51ff82a0fcd962d5ee0/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L1203-L1207>2</a>]</td></tr><tr><td><p>Input of method WriteToIO.expand</td><td><p>The expand method of a Write transform must return a PCollection object with a type, and be annotated with the type. Preferred PCollection types in Python are the same as the output types for a ReadFromIO referenced in T1.</td></tr></table></div><h3 id=golang>GoLang</h3><h4 id=general-2>General</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>If the I/O lives in Apache Beam it should be placed in the package:<p><strong>{connector}io</strong><p>Example:<p><a href=https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/avroio/avroio.go>avroio</a> and <a href=https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go>bigqueryio</a></td></tr><tr><td><p>Integration and Performance tests should live under the same package as the I/O itself<p><strong>{connector}io</strong></td></tr></table></div><h3 id=typescript>Typescript</h3><h4 id=classes--methods--properties-2>Classes / Methods / Properties</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Typescript Syntax</th><th><p>Semantics</th></tr><tr><td><p>function readFromXXX</td><td><p>The method to start constructing an I/O.Read transform.</td></tr><tr><td><p>function writeToXXX</td><td><p>The method to start constructing an I/O.Write transform.</td></tr></table></div><h2 id=testing>Testing</h2><p>An I/O should have unit tests, integration tests, and performance tests. In the following guidance we explain what each type of test aims to achieve, and provide a <em>baseline</em> standard of test coverage. Do note that the actual test cases and business logic of the actual test would vary depending on specifics of each source/sink but we have included some suggested test cases as a baseline.</p><p>This guide complements the <a href=https://beam.apache.org/documentation/io/testing/>Apache Beam I/O transform testing guide</a> by adding specific test cases and scenarios. For general information regarding testing Beam I/O connectors, please refer to that guide.</p><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>Integration and performance tests should live under the package org.apache.beam.sdk.io.{connector}.testing. This will cause the various tests to work with the standard user-facing interfaces of the connector.<p>Unit tests should reside in the same package (i.e. org.apache.beam.sdk.io.{connector}), as they may often test internals of the connector.</td></tr></table></div><h3 id=unit-tests>Unit Tests</h3><p>I/O unit tests need to efficiently test the functionality of the code. Given that unit tests are expected to be executed many times over multiple test suites (for example, for each Python version) these tests should execute relatively fast and should not have side effects. We recommend trying to achieve 100% code coverage through unit tests.</p><p>When possible, unit tests are favored over integration tests due to faster execution time and low resource usage. Additionally, unit tests can be easily included in pre-commit tests suites (for example, Jenkins <strong>beam_PreCommit_*</strong> test suites) hence has a better chance of discovering regressions early. Unit tests are also preferred for error conditions.</p><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>The unit testing class should be part of the same package as the IO and named {connector}IOTest.<p>Example:<p><a href=https://github.com/apache/beam/blob/v2.43.0/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java>sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java</a></td></tr></table></div><h4 id=suggested-test-cases>Suggested Test Cases</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Functionality to test</th><th><p>Description</th><th><p>Example(s)</th></tr><tr><td><p>Reading with default options</td><td><p>Preferably runs a pipeline locally using DirectRunner and a fake of the data store. But can be a unit test of the source transform using mocks.</td><td><p><a href=https://github.com/apache/beam/blob/f9ae6d53e2e6ad8346cee955d646f7198dbb6502/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java#L473>BigtableIOTest.testReading</a><p><a href=https://github.com/apache/beam/blob/cb28a5b0265a04d60ad005684d0fbb4db74128f2/sdks/python/apache_beam/io/gcp/pubsub_test.py#L477>pubsub_test.TestReadFromPubSub.test_read_messages_success</a><p><a href=https://github.com/apache/beam/blob/689e70b5131620540faf52e2f1e2dca7a36f269d/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L320>CassandraIOTest.testRead</a></td></tr><tr><td><p>Writing with default options</td><td><p>Preferably runs a pipeline locally using DirectRunner and a fake of the data store. But can be a unit test of the sink transform using mocks.</td><td><p><a href=https://github.com/apache/beam/blob/f9ae6d53e2e6ad8346cee955d646f7198dbb6502/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java#L1184>BigtableIOTest.testWriting</a><p><a href=https://github.com/apache/beam/blob/cb28a5b0265a04d60ad005684d0fbb4db74128f2/sdks/python/apache_beam/io/gcp/pubsub_test.py#L810>pubsub_test.TestWriteToPubSub.test_write_messages_success</a></td></tr><tr><td><p>Reading with additional options</td><td><p>For every option available to users.</td><td><p><a href=https://github.com/apache/beam/blob/f9ae6d53e2e6ad8346cee955d646f7198dbb6502/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java#L620>BigtableIOTest.testReadingWithFilter</a></td></tr><tr><td><p>Writing with additional options</td><td><p>For every option available to users. For example, writing to dynamic destinations.</td><td><p><a href=https://github.com/apache/beam/blob/5b3f70bec72b6b646fe97d4eb7f8bd715dd562a8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java#L1410>BigTableIOTest.testReadWithBigTableOptionsSetsRetryOptions</a><p><a href=https://github.com/apache/beam/blob/cd05896ebc385d12f7a7801f3bbba0127bef8b3b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java#L270>BigQueryIOWriteTest.testWriteDynamicDestinations</a></td></tr><tr><td><p>Reading additional element types</td><td><p>If the data store read schema supports different data types.</td><td><p><a href=https://github.com/apache/beam/blob/5b3f70bec72b6b646fe97d4eb7f8bd715dd562a8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java#L434>BigQueryIOReadTest.testReadTableWithSchema</a></td></tr><tr><td><p>Writing additional element types</td><td><p>If the data store write schema supports different data types.</td><td></td></tr><tr><td><p>Display data</td><td><p>Tests that the source/sink populates display data correctly.</td><td><p><a href=https://github.com/apache/beam/blob/8bda63bc8ea0c1de9ec29d0da080df1769c65a2b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroIOTest.java#L174>AvroIOTest.testReadDisplayData</a><p><a href=https://github.com/apache/beam/blob/f9ae6d53e2e6ad8346cee955d646f7198dbb6502/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java#L220>DatastoreV1Test.testReadDisplayData</a><p><a href=https://github.com/apache/beam/blob/4012a46d3aa7b2a4c628f1352c8b579733c71b41/sdks/python/apache_beam/io/gcp/bigquery_test.py#L187>bigquery_test.TestBigQuerySourcetest_table_reference_display_data</a></td></tr><tr><td><p>Initial splitting</td><td><p>There can be many variations of these tests. Please refer to examples for details.</td><td><p><a href=https://github.com/apache/beam/blob/05f9e775b8970602760aa56644285741c70190d0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java#L659>BigqueryIOReadTest.estBigQueryQuerySourceInitSplit</a><p><a href=https://github.com/apache/beam/blob/cb28a5b0265a04d60ad005684d0fbb4db74128f2/sdks/python/apache_beam/io/avroio_test.py#L241>avroio_test.AvroBase.test_read_with_splitting</a></td></tr><tr><td><p>Dynamic work rebalancing</td><td><p>There can be many variations of these tests. Please refer to examples for details.</td><td><p><a href=https://github.com/apache/beam/blob/09bbb48187301f18bec6d9110741c69b955e2b5a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java#L670>BigTableIOTest.testReadingSplitAtFractionExhaustive</a><p><a href=https://github.com/apache/beam/blob/cb28a5b0265a04d60ad005684d0fbb4db74128f2/sdks/python/apache_beam/io/avroio_test.py#L309>avroio_test.AvroBase.test_dynamic_work_rebalancing_exhaustive</a></td></tr><tr><td><p>Schema support</td><td><p>Reading a PCollection&lt;Row> or writing a PCollection&lt;Row><p>Should verify retrieving schema from a source, and pushing/verifying the schema for a sink.</td><td><p><a href=https://github.com/apache/beam/blob/6ba27f0b86e464d912a16b17b554fe68db03bc69/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java#L434>BigQueryIOReadTest.testReadTableWithSchema</a><p><a href=https://github.com/apache/beam/blob/6ba27f0b86e464d912a16b17b554fe68db03bc69/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java#L1129>BigQueryIOWriteTest.testSchemaWriteLoads</a></td></tr><tr><td><p>Validation test</td><td><p>Tests that source/sink transform is validated correctly, i.e. incorrect/incompatible configurations are rejected with actionable errors.</td><td><p><a href=https://github.com/apache/beam/blob/cd05896ebc385d12f7a7801f3bbba0127bef8b3b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java#L1560>BigQueryIOWriteTest.testWriteValidatesDataset</a><p><a href=https://github.com/apache/beam/blob/d99ebb69f7adc0afe1ffc607d1bbdda80eb2e08a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java#L107>PubsubIOTest.testTopicValidationSuccess</a></td></tr><tr><td><p>Metrics</td><td><p>Confirm that various read/write metrics get set</td><td><p><a href=https://github.com/apache/beam/blob/c57c983c8ae7d84926f9cf42f7c40af8eaf60545/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java#L333>SpannerIOReadTest.testReadMetrics</a><p><a href=https://github.com/apache/beam/blob/25e6008e8919c2f31eaebae2662b44e02f9f37a1/sdks/python/apache_beam/io/gcp/bigtableio_test.py#L59>bigtableio_test.TestWriteBigTable.test_write_metrics</a></td></tr><tr><td><p>Read All</td><td><p>Test read all (PCollection&lt;Read Config>) version of the test works</td><td><p><a href=https://github.com/apache/beam/blob/09bbb48187301f18bec6d9110741c69b955e2b5a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java#L503>SpannerIOReadTest.readAllPipeline</a><p><a href=https://github.com/apache/beam/blob/689e70b5131620540faf52e2f1e2dca7a36f269d/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L381>CassandraIOTest.readAllQuery</a></td></tr><tr><td><p>Sink batching test</td><td><p>Make sure that sinks batch data before writing if the sinks performace batching for performance reasons.</td><td><p><a href=https://github.com/apache/beam/blob/c57c983c8ae7d84926f9cf42f7c40af8eaf60545/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java#L1200>SpannerIOWriteTest.testBatchFn_cells</a></td></tr><tr><td><p>Error handling</td><td><p>Make sure that various errors (for example, HTTP error codes) from a data store are handled correctly</td><td><p><a href=https://github.com/apache/beam/blob/cd05896ebc385d12f7a7801f3bbba0127bef8b3b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java#L2185>BigQueryIOWriteTest.testExtendedErrorRetrieval</a></td></tr><tr><td><p>Retry policy</td><td><p>Confirms that the source/sink retries requests as expected</td><td><p><a href=https://github.com/apache/beam/blob/cd05896ebc385d12f7a7801f3bbba0127bef8b3b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java#L822>BigQueryIOWriteTest.testRetryPolicy</a></td></tr><tr><td><p>Output PCollection from a sink</td><td><p>Sinks should produce a PCollection that subsequent steps could depend on.</td><td><p><a href=https://github.com/apache/beam/blob/cd05896ebc385d12f7a7801f3bbba0127bef8b3b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java#L1912>BigQueryIOWriteTest.testWriteTables</a></td></tr><tr><td><p>Backlog byte reporting</td><td><p>Tests to confirm that the unbounded source transforms report backlog bytes correctly.</td><td><p><a href=https://github.com/apache/beam/blob/cc0b2c5f3529e1896778dddeb6c740d40c7fb977/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java#L170>KinesisReaderTest.getSplitBacklogBytesShouldReturnBacklogUnknown</a></td></tr><tr><td><p>Watermark reporting</td><td><p>Tests to confirm that the unbounded source transforms report the watermark correctly.</td><td><p><a href=https://github.com/apache/beam/blob/64dc9c62dce2e5af1f52c93a04702f17bfa89a60/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java#L40>WatermarkPolicyTest.shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords</a></td></tr></table></div><h3 id=integration-tests>Integration Tests</h3><p>Integration tests test end-to-end interactions between the Beam runner and the data store a given I/O connects to. Since these usually involve remote RPC calls, integration tests take a longer time to execute. Additionally, Beam runners may use more than one worker when executing integration tests. Due to these costs, an integration test should only be implemented when a given scenario cannot be covered by a unit test.</p><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p><strong>Implementing at least one integration test that involves interactions between Beam and the external storage system is required for submission.</strong></td></tr><tr><td><p>I/O connectors that involve both source and a sink, Beam guide recommends implementing tests in the write-then-read form so that both read and write can be covered by the same test pipeline.</td></tr><tr><td><p>The integration testing class should be part of the same package as the I/O and named <strong>{connector}IOIT</strong>.<p>For example:<p><a href=https://github.com/apache/beam/blob/689e70b5131620540faf52e2f1e2dca7a36f269d/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java>sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java</a></td></tr></table></div><h4 id=suggested-test-cases-1>Suggested Test Cases</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><th><p>Test type</th><th><p>Description</th><th><p>Example(s)</th></tr><tr><td><p>“Write then read” test using Dataflow</td><td><p>Writes generated data to the datastore and reads the same data back from the datastore using Dataflow.</td><td><p><a href=https://github.com/apache/beam/blob/774008de21090c635dc23c58b2f7d9d4aaa40cbf/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java#L129>JdbcIOIT.testWriteThenRead</a></td></tr><tr><td><p>“Write then read all” test using Dataflow</td><td><p>Same as “write then read” but for sources that support reading a PCollection of source configs. All future (SDF) sources are expected to support this.<p>If the same transform is used for “read” and “read all” forms or of the two transforms are essentially the same (for example, read transform is a simple wrapper of the read all or vise versa) just adding a single “read all” test should be sufficient.</td><td><p><a href=https://github.com/apache/beam/blob/17a5c26bc0cbb57139d69683eaefc4b998c15866/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java#L173>SpannerReadIT.testReadAllRecordsInDb</a></td></tr><tr><td><p>Unbounded write then read using Dataflow</td><td><p>A pipeline that continuously writes and reads data. Such a pipeline should be canceled to verify the results. This is only for connectors that support unbounded read.</td><td><p><a href=https://github.com/apache/beam/blob/95e9de7891593fa73d582cb6ba5ac0333b2675ff/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java#L134>KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming</a></td></tr></table></div><h3 id=performance-tests>Performance Tests</h3><p><strong>Because the Performance testing framework is still in flux, performance tests can be a follow-up submission after the actual I/O code.</strong></p><p><strong>The Performance testing framework does not yet support GoLang or Typescript.</strong></p><p>Performance benchmarks are a critical part of best practices for I/Os as they effectively address several areas:</p><ul><li>To evaluate if the cost and performance of a specific I/O or dataflow template meets the customer’s business requirements.</li><li>To illustrate performance regressions and improvements to I/O or dataflow templates between code changes.</li><li>To help end customers estimate costs and plan capacity to meet their SLOs.</li></ul><h4 id=dashboard>Dashboard</h4><p>Google runs performance tests routinely for built-in I/Os and publishes them to an externally viewable dashboard for <a href="https://s.apache.org/beam-community-metrics/d/bnlHKP3Wz/java-io-it-tests-dataflow?orgId=1">Java</a> and <a href="https://s.apache.org/beam-community-metrics/d/gP7vMPqZz/python-io-it-tests-dataflow?orgId=1">Python</a>.</p><p><img src=/images/io-standards/io-connector-performance-test-dashboard-screenshot.png alt="Dataflow performance test dashboard" title="Dataflow performance test dashboard"></img><h4 id=guidance-1>Guidance</h4><div class=table-container-wrapper><table class="table table-bordered table-io-standards"><tr><td><p>Use the same tests for integration and performance tests when possible. Performance tests are usually the same as an integration test but involve a larger volume of data. Testing frameworks (internal and external) provide features to track performance benchmarks related to these tests and provide dashboards/tooling to detect anomalies.</td></tr><tr id=resource-scalability><td><p>Include a <strong>Resource Scalability</strong> section into your page under <strong><a href=#built-in-io>Built-in I/O connector guides</a> </strong>documentation<strong></strong> which will indicate the upper bounds which the IO has integration tests for.<p>For example:<p>An indication that KafkaIO has integration tests with <strong>xxxx</strong> topics. The documentation can state if the connector authors believe that the connector can scale beyond the integration test number, however this will make it clear to the user the limits of the tested paths.<p>The documentation should clearly indicate the configuration that was followed for the limits. For example using runner x and configuration option a.</td></tr><tr><td><p>Document the performance / internal metrics that your I/O collects including what they mean, and how they can be used (some connectors collect and publish performance metrics like latency/bundle size/etc)</td></tr><tr><td><p>Include expected performance characteristics of the I/O based on performance tests that the connector has in place.</td></tr></table></div><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>