blob: 16a800f9e00f91dca1613be613899ce0b7e0e7e0 [file] [log] [blame]
<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Apache Beam: Developing I/O connectors for Java</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/documentation/io/developing-io-java/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/developing-io-java.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/documentation/io/developing-io-java.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Concepts</span><ul class=section-nav-list><li><a href=/documentation/basics/>Basics of the Beam model</a></li><li><a href=/documentation/runtime/model/>How Beam executes a pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Beam programming guide</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/>Overview</a></li><li><a href=/documentation/programming-guide/#creating-a-pipeline>Pipelines</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>PCollections</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pcollections>Creating a PCollection</a></li><li><a href=/documentation/programming-guide/#pcollection-characteristics>PCollection characteristics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#applying-transforms>Applying transforms</a></li><li><span class=section-nav-list-title>Core Beam transforms</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pardo>ParDo</a></li><li><a href=/documentation/programming-guide/#groupbykey>GroupByKey</a></li><li><a href=/documentation/programming-guide/#cogroupbykey>CoGroupByKey</a></li><li><a href=/documentation/programming-guide/#combine>Combine</a></li><li><a href=/documentation/programming-guide/#flatten>Flatten</a></li><li><a href=/documentation/programming-guide/#partition>Partition</a></li></ul></li><li><a href=/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms>Requirements for user code</a></li><li><a href=/documentation/programming-guide/#side-inputs>Side inputs</a></li><li><a href=/documentation/programming-guide/#additional-outputs>Additional outputs</a></li><li><a href=/documentation/programming-guide/#composite-transforms>Composite transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline I/O</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#pipeline-io>Using I/O transforms</a></li><li><a href=/documentation/io/connectors/>I/O connectors</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>I/O connector guides</span><ul class=section-nav-list><li><a href=/documentation/io/built-in/parquet/>Apache Parquet I/O connector</a></li><li><a href=/documentation/io/built-in/hadoop/>Hadoop Input/Output Format IO</a></li><li><a href=/documentation/io/built-in/hcatalog/>HCatalog IO</a></li><li><a href=/documentation/io/built-in/google-bigquery/>Google BigQuery I/O connector</a></li><li><a href=/documentation/io/built-in/snowflake/>Snowflake I/O connector</a></li><li><a href=/documentation/io/built-in/cdap/>CDAP I/O connector</a></li><li><a href=/documentation/io/built-in/sparkreceiver/>Spark Receiver I/O connector</a></li><li><a href=/documentation/io/built-in/singlestore/>SingleStoreDB I/O connector</a></li><li><a href=/documentation/io/built-in/webapis/>Web APIs I/O connector</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Developing new I/O connectors</span><ul class=section-nav-list><li><a href=/documentation/io/developing-io-overview/>Overview: Developing connectors</a></li><li><a href=/documentation/io/developing-io-java/>Developing connectors (Java)</a></li><li><a href=/documentation/io/developing-io-python/>Developing connectors (Python)</a></li><li><a href=/documentation/io/io-standards/>I/O Standards</a></li></ul></li><li><a href=/documentation/io/testing/>Testing I/O transforms</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Schemas</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#what-is-a-schema>What is a schema</a></li><li><a href=/documentation/programming-guide/#schemas-for-pl-types>Schemas for programming language types</a></li><li><a href=/documentation/programming-guide/#schema-definition>Schema definition</a></li><li><a href=/documentation/programming-guide/#logical-types>Logical types</a></li><li><a href=/documentation/programming-guide/#creating-schemas>Creating schemas</a></li><li><a href=/documentation/programming-guide/#using-schemas>Using schemas</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data encoding and type safety</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#data-encoding-and-type-safety>Data encoding basics</a></li><li><a href=/documentation/programming-guide/#specifying-coders>Specifying coders</a></li><li><a href=/documentation/programming-guide/#default-coders-and-the-coderregistry>Default coders and the CoderRegistry</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Windowing</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#windowing>Windowing basics</a></li><li><a href=/documentation/programming-guide/#provided-windowing-functions>Provided windowing functions</a></li><li><a href=/documentation/programming-guide/#setting-your-pcollections-windowing-function>Setting your PCollection’s windowing function</a></li><li><a href=/documentation/programming-guide/#watermarks-and-late-data>Watermarks and late data</a></li><li><a href=/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements>Adding timestamps to a PCollection’s elements</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Triggers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#triggers>Trigger basics</a></li><li><a href=/documentation/programming-guide/#event-time-triggers>Event time triggers and the default trigger</a></li><li><a href=/documentation/programming-guide/#processing-time-triggers>Processing time triggers</a></li><li><a href=/documentation/programming-guide/#data-driven-triggers>Data-driven triggers</a></li><li><a href=/documentation/programming-guide/#setting-a-trigger>Setting a trigger</a></li><li><a href=/documentation/programming-guide/#composite-triggers>Composite triggers</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Metrics</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#metrics>Metrics basics</a></li><li><a href=/documentation/programming-guide/#types-of-metrics>Types of metrics</a></li><li><a href=/documentation/programming-guide/#querying-metrics>Querying metrics</a></li><li><a href=/documentation/programming-guide/#using-metrics>Using metrics in pipeline</a></li><li><a href=/documentation/programming-guide/#export-metrics>Export metrics</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>State and Timers</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#types-of-state>Types of state</a></li><li><a href=/documentation/programming-guide/#deferred-state-reads>Deferred state reads</a></li><li><a href=/documentation/programming-guide/#timers>Timers</a></li><li><a href=/documentation/programming-guide/#garbage-collecting-state>Garbage collecting state</a></li><li><a href=/documentation/programming-guide/#state-timers-examples>State and timers examples</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Splittable DoFns</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#sdf-basics>Basics</a></li><li><a href=/documentation/programming-guide/#sizing-and-progress>Sizing and progress</a></li><li><a href=/documentation/programming-guide/#user-initiated-checkpoint>User-initiated checkpoint</a></li><li><a href=/documentation/programming-guide/#runner-initiated-split>Runner initiated split</a></li><li><a href=/documentation/programming-guide/#watermark-estimation>Watermark estimation</a></li><li><a href=/documentation/programming-guide/#truncating-during-drain>Truncating during drain</a></li><li><a href=/documentation/programming-guide/#bundle-finalization>Bundle finalization</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Multi-language Pipelines</span><ul class=section-nav-list><li><a href=/documentation/programming-guide/#create-x-lang-transforms>Creating cross-language transforms</a></li><li><a href=/documentation/programming-guide/#use-x-lang-transforms>Using cross-language transforms</a></li><li><a href=/documentation/programming-guide/#x-lang-transform-runner-support>Runner Support</a></li></ul></li><li><a href=/documentation/programming-guide/#batched-dofns>Batched DoFns</a></li><li><a href=/documentation/programming-guide/#transform-service>Transform service</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Pipeline development lifecycle</span><ul class=section-nav-list><li><a href=/documentation/pipelines/design-your-pipeline/>Design Your Pipeline</a></li><li><a href=/documentation/pipelines/create-your-pipeline/>Create Your Pipeline</a></li><li><a href=/documentation/pipelines/test-your-pipeline/>Test Your Pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Common pipeline patterns</span><ul class=section-nav-list><li><a href=/documentation/patterns/overview/>Overview</a></li><li><a href=/documentation/patterns/file-processing/>File processing</a></li><li><a href=/documentation/patterns/side-inputs/>Side inputs</a></li><li><a href=/documentation/patterns/pipeline-options/>Pipeline options</a></li><li><a href=/documentation/patterns/custom-io/>Custom I/O</a></li><li><a href=/documentation/patterns/custom-windows/>Custom windows</a></li><li><a href=/documentation/patterns/bigqueryio/>BigQueryIO</a></li><li><a href=/documentation/patterns/ai-platform/>AI Platform</a></li><li><a href=/documentation/patterns/schema/>Schema</a></li><li><a href=/documentation/patterns/bqml/>BigQuery ML</a></li><li><a href=/documentation/patterns/grouping-elements-for-efficient-external-service-calls/>Grouping elements for efficient external service calls</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>AI/ML pipelines</span><ul class=section-nav-list><li><a href=/documentation/ml/overview/>Get started with AI/ML</a></li><li><a href=/documentation/ml/about-ml/>About Beam ML</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Prediction and inference</span><ul class=section-nav-list><li><a href=/documentation/ml/inference-overview/>Overview</a></li><li><a href=/documentation/ml/multi-model-pipelines/>Build a pipeline with multiple models</a></li><li><a href=/documentation/ml/tensorrt-runinference>Build a custom model handler with TensorRT</a></li><li><a href=/documentation/ml/large-language-modeling>Use LLM inference</a></li><li><a href=/documentation/ml/multi-language-inference/>Build a multi-language inference pipeline</a></li><li><a href=/documentation/ml/side-input-updates/>Update your model in production</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Data processing</span><ul class=section-nav-list><li><a href=/documentation/ml/preprocess-data/>Preprocess data</a></li><li><a href=/documentation/ml/data-processing/>Explore your data</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Workflow orchestration</span><ul class=section-nav-list><li><a href=/documentation/ml/orchestration/>Use ML-OPS workflow orchestrators</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Model training</span><ul class=section-nav-list><li><a href=/documentation/ml/per-entity-training>Per-entity training</a></li><li><a href=/documentation/ml/online-clustering/>Online clustering</a></li><li><a href=/documentation/ml/model-evaluation/>ML model evaluation</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Use cases</span><ul class=section-nav-list><li><a href=/documentation/ml/anomaly-detection/>Build an anomaly detection pipeline</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Reference</span><ul class=section-nav-list><li><a href=/documentation/ml/runinference-metrics/>RunInference metrics</a></li><li><a href=/documentation/ml/model-evaluation/>Model validation</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Runtime systems</span><ul class=section-nav-list><li><a href=/documentation/runtime/environments/>Container environments</a></li><li><a href=/documentation/runtime/resource-hints/>Resource hints</a></li><li><a href=/documentation/runtime/sdk-harness-config/>SDK Harness Configuration</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Transform catalog</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Python</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li class=section-nav-item--collapsible><span class=section-nav-list-title>Enrichment</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/enrichment/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-bigtable/>Bigtable example</a></li><li><a href=/documentation/transforms/python/elementwise/enrichment-vertexai/>Vertex AI Feature Store examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/python/elementwise/flatmap/>FlatMap</a></li><li><a href=/documentation/transforms/python/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/python/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/python/elementwise/map/>Map</a></li><li><a href=/documentation/transforms/python/elementwise/mltransform/>MLTransform</a></li><li><a href=/documentation/transforms/python/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/python/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/python/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/python/elementwise/reify/>Reify</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>RunInference</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/elementwise/runinference/>Overview</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-pytorch/>PyTorch examples</a></li><li><a href=/documentation/transforms/python/elementwise/runinference-sklearn/>Sklearn examples</a></li></ul></li><li><a href=/documentation/transforms/python/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/python/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/python/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/python/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/python/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/combineglobally/>CombineGlobally</a></li><li><a href=/documentation/transforms/python/aggregation/combineperkey/>CombinePerKey</a></li><li><a href=/documentation/transforms/python/aggregation/combinevalues/>CombineValues</a></li><li><a href=/documentation/transforms/python/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/python/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/python/aggregation/groupby/>GroupBy</a></li><li><a href=/documentation/transforms/python/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/python/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/python/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/python/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/python/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/python/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/python/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/python/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/python/aggregation/top/>Top</a></li><li><a href=/documentation/transforms/python/aggregation/tolist/>ToList</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/python/other/create/>Create</a></li><li><a href=/documentation/transforms/python/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/python/other/reshuffle/>Reshuffle</a></li><li><a href=/documentation/transforms/python/other/windowinto/>WindowInto</a></li></ul></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Java</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/overview/>Overview</a></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Element-wise</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/elementwise/filter/>Filter</a></li><li><a href=/documentation/transforms/java/elementwise/flatmapelements/>FlatMapElements</a></li><li><a href=/documentation/transforms/java/elementwise/keys/>Keys</a></li><li><a href=/documentation/transforms/java/elementwise/kvswap/>KvSwap</a></li><li><a href=/documentation/transforms/java/elementwise/mapelements/>MapElements</a></li><li><a href=/documentation/transforms/java/elementwise/pardo/>ParDo</a></li><li><a href=/documentation/transforms/java/elementwise/partition/>Partition</a></li><li><a href=/documentation/transforms/java/elementwise/regex/>Regex</a></li><li><a href=/documentation/transforms/java/elementwise/reify/>Reify</a></li><li><a href=/documentation/transforms/java/elementwise/tostring/>ToString</a></li><li><a href=/documentation/transforms/java/elementwise/values/>Values</a></li><li><a href=/documentation/transforms/java/elementwise/withkeys/>WithKeys</a></li><li><a href=/documentation/transforms/java/elementwise/withtimestamps/>WithTimestamps</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Aggregation</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/aggregation/approximatequantiles/>ApproximateQuantiles</a></li><li><a href=/documentation/transforms/java/aggregation/approximateunique/>ApproximateUnique</a></li><li><a href=/documentation/transforms/java/aggregation/cogroupbykey/>CoGroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/combine/>Combine</a></li><li><a href=/documentation/transforms/java/aggregation/combinewithcontext/>CombineWithContext</a></li><li><a href=/documentation/transforms/java/aggregation/count/>Count</a></li><li><a href=/documentation/transforms/java/aggregation/distinct/>Distinct</a></li><li><a href=/documentation/transforms/java/aggregation/groupbykey/>GroupByKey</a></li><li><a href=/documentation/transforms/java/aggregation/groupintobatches/>GroupIntoBatches</a></li><li><a href=/documentation/transforms/java/aggregation/hllcount/>HllCount</a></li><li><a href=/documentation/transforms/java/aggregation/latest/>Latest</a></li><li><a href=/documentation/transforms/java/aggregation/max/>Max</a></li><li><a href=/documentation/transforms/java/aggregation/mean/>Mean</a></li><li><a href=/documentation/transforms/java/aggregation/min/>Min</a></li><li><a href=/documentation/transforms/java/aggregation/sample/>Sample</a></li><li><a href=/documentation/transforms/java/aggregation/sum/>Sum</a></li><li><a href=/documentation/transforms/java/aggregation/top/>Top</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Other</span><ul class=section-nav-list><li><a href=/documentation/transforms/java/other/create/>Create</a></li><li><a href=/documentation/transforms/java/other/flatten/>Flatten</a></li><li><a href=/documentation/transforms/java/other/passert/>PAssert</a></li><li><a href=/documentation/transforms/java/other/view/>View</a></li><li><a href=/documentation/transforms/java/other/window/>Window</a></li></ul></li></ul></li></ul></li><li><a href=/documentation/glossary/>Glossary</a></li><li><a href=https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam>Beam Wiki <img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#basic-code-reqs>Basic code requirements</a></li><li><a href=#implementing-the-source-interface>Implementing the Source interface</a><ul><li><a href=#implementing-the-source-subclass>Implementing the Source subclass</a><ul><li><a href=#boundedsource>BoundedSource</a></li><li><a href=#unboundedsource>UnboundedSource</a></li></ul></li><li><a href=#implementing-the-reader-subclass>Implementing the Reader subclass</a><ul><li><a href=#reader-methods-common-to-both-boundedreader-and-unboundedreader>Reader methods common to both BoundedReader and UnboundedReader</a></li><li><a href=#reader-methods-unique-to-unboundedreader>Reader methods unique to UnboundedReader</a></li><li><a href=#bounded-dynamic>Using your BoundedSource with dynamic work rebalancing</a></li></ul></li><li><a href=#convenience-source-and-reader-base-classes>Convenience Source and Reader base classes</a><ul><li><a href=#filebasedsource>FileBasedSource</a></li></ul></li></ul></li><li><a href=#using-filebasedsink>Using the FileBasedSink abstraction</a></li><li><a href=#ptransform-wrappers>PTransform wrappers</a></li></ul></nav></nav><div class="body__contained body__section-nav arrow-list arrow-list--no-mt"><h1 id=developing-io-connectors-for-java>Developing I/O connectors for Java</h1><p><strong>IMPORTANT:</strong> Use <code>Splittable DoFn</code> to develop your new I/O. For more details, read the
<a href=/documentation/io/developing-io-overview/>new I/O connector overview</a>.</p><p>To connect to a data store that isn’t supported by Beam’s existing I/O
connectors, you must create a custom I/O connector that usually consist of a
source and a sink. All Beam sources and sinks are composite transforms; however,
the implementation of your custom I/O depends on your use case. Before you
start, read the
<a href=/documentation/io/developing-io-overview/>new I/O connector overview</a>
for an overview of developing a new I/O connector, the available implementation
options, and how to choose the right option for your use case.</p><p>This guide covers using the <code>Source</code> and <code>FileBasedSink</code> interfaces using Java.
The Python SDK offers the same functionality, but uses a slightly different API.
See <a href=/documentation/io/developing-io-python/>Developing I/O connectors for Python</a>
for information specific to the Python SDK.</p><h2 id=basic-code-reqs>Basic code requirements</h2><p>Beam runners use the classes you provide to read and/or write data using
multiple worker instances in parallel. As such, the code you provide for
<code>Source</code> and <code>FileBasedSink</code> subclasses must meet some basic requirements:</p><ol><li><p><strong>Serializability:</strong> Your <code>Source</code> or <code>FileBasedSink</code> subclass, whether
bounded or unbounded, must be Serializable. A runner might create multiple
instances of your <code>Source</code> or <code>FileBasedSink</code> subclass to be sent to
multiple remote workers to facilitate reading or writing in parallel.</p></li><li><p><strong>Immutability:</strong>
Your <code>Source</code> or <code>FileBasedSink</code> subclass must be effectively immutable.
All private fields must be declared final, and all private variables of
collection type must be effectively immutable. If your class has setter
methods, those methods must return an independent copy of the object with
the relevant field modified.</p><p>You should only use mutable state in your <code>Source</code> or <code>FileBasedSink</code>
subclass if you are using lazy evaluation of expensive computations that
you need to implement the source or sink; in that case, you must declare
all mutable instance variables transient.</p></li><li><p><strong>Thread-Safety:</strong> Your code must be thread-safe. If you build your source
to work with dynamic work rebalancing, it is critical that you make your
code thread-safe. The Beam SDK provides a helper class to make this easier.
See <a href=#bounded-dynamic>Using Your BoundedSource with dynamic work rebalancing</a>
for more details.</p></li><li><p><strong>Testability:</strong> It is critical to exhaustively unit test all of your
<code>Source</code> and <code>FileBasedSink</code> subclasses, especially if you build your
classes to work with advanced features such as dynamic work rebalancing. A
minor implementation error can lead to data corruption or data loss (such
as skipping or duplicating records) that can be hard to detect.</p><p>To assist in testing <code>BoundedSource</code> implementations, you can use the
SourceTestUtils class. <code>SourceTestUtils</code> contains utilities for automatically
verifying some of the properties of your <code>BoundedSource</code> implementation. You
can use <code>SourceTestUtils</code> to increase your implementation&rsquo;s test coverage
using a wide range of inputs with relatively few lines of code. For
examples that use <code>SourceTestUtils</code>, see the
<a href=https://github.com/apache/beam/blob/master/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/io/AvroSourceTest.java>AvroSourceTest</a> and
<a href=https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java>TextIOReadTest</a>
source code.</p></li></ol><p>In addition, see the <a href=/contribute/ptransform-style-guide/>PTransform style guide</a>
for Beam&rsquo;s transform style guidance.</p><h2 id=implementing-the-source-interface>Implementing the Source interface</h2><p>To create a data source for your pipeline, you must provide the format-specific
logic that tells a runner how to read data from your input source, and how to
split your data source into multiple parts so that multiple worker instances can
read your data in parallel. If you&rsquo;re creating a data source that reads
unbounded data, you must provide additional logic for managing your source&rsquo;s
watermark and optional checkpointing.</p><p>Supply the logic for your source by creating the following classes:</p><ul><li><p>A subclass of <code>BoundedSource</code> if you want to read a finite (batch) data set,
or a subclass of <code>UnboundedSource</code> if you want to read an infinite (streaming)
data set. These subclasses describe the data you want to read, including the
data&rsquo;s location and parameters (such as how much data to read).</p></li><li><p>A subclass of <code>Source.Reader</code>. Each Source must have an associated Reader that
captures all the state involved in reading from that <code>Source</code>. This can
include things like file handles, RPC connections, and other parameters that
depend on the specific requirements of the data format you want to read.</p></li><li><p>The <code>Reader</code> class hierarchy mirrors the Source hierarchy. If you&rsquo;re extending
<code>BoundedSource</code>, you&rsquo;ll need to provide an associated <code>BoundedReader</code>. if you&rsquo;re
extending <code>UnboundedSource</code>, you&rsquo;ll need to provide an associated
<code>UnboundedReader</code>.</p></li><li><p>One or more user-facing wrapper composite transforms (<code>PTransform</code>) that
wrap read operations. <a href=#ptransform-wrappers>PTransform wrappers</a> discusses
why you should avoid exposing your sources.</p></li></ul><h3 id=implementing-the-source-subclass>Implementing the Source subclass</h3><p>You must create a subclass of either <code>BoundedSource</code> or <code>UnboundedSource</code>,
depending on whether your data is a finite batch or an infinite stream. In
either case, your <code>Source</code> subclass must override the abstract methods in the
superclass. A runner might call these methods when using your data source. For
example, when reading from a bounded source, a runner uses these methods to
estimate the size of your data set and to split it up for parallel reading.</p><p>Your <code>Source</code> subclass should also manage basic information about your data
source, such as the location. For example, the example <code>Source</code> implementation
in Beam’s <a href=https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.html>DatastoreIO</a>
class takes host, datasetID, and query as arguments. The connector uses these
values to obtain data from Cloud Datastore.</p><h4 id=boundedsource>BoundedSource</h4><p><code>BoundedSource</code> represents a finite data set from which a Beam runner may read,
possibly in parallel. <code>BoundedSource</code> contains a set of abstract methods that
the runner uses to split the data set for reading by multiple workers.</p><p>To implement a <code>BoundedSource</code>, your subclass must override the following
abstract methods:</p><ul><li><p><code>split</code>: The runner uses this method to split your finite data
into bundles of a given size.</p></li><li><p><code>getEstimatedSizeBytes</code>: The runner uses this method to estimate the total
size of your data, in bytes.</p></li><li><p><code>createReader</code>: Creates the associated <code>BoundedReader</code> for this
<code>BoundedSource</code>.</p></li></ul><p>You can see a model of how to implement <code>BoundedSource</code> and the required
abstract methods in Beam’s implementations for Cloud BigTable
(<a href=https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java>BigtableIO.java</a>)
and BigQuery (<a href=https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java>BigQuerySourceBase.java</a>).</p><h4 id=unboundedsource>UnboundedSource</h4><p><code>UnboundedSource</code> represents an infinite data stream from which the runner may
read, possibly in parallel. <code>UnboundedSource</code> contains a set of abstract methods
that the runner uses to support streaming reads in parallel; these include
<em>checkpointing</em> for failure recovery, <em>record IDs</em> to prevent data duplication,
and <em>watermarking</em> for estimating data completeness in downstream parts of your
pipeline.</p><p>To implement an <code>UnboundedSource</code>, your subclass must override the following
abstract methods:</p><ul><li><p><code>split</code>: The runner uses this method to generate a list of
<code>UnboundedSource</code> objects which represent the number of sub-stream instances
from which the service should read in parallel.</p></li><li><p><code>getCheckpointMarkCoder</code>: The runner uses this method to obtain the Coder for
the checkpoints for your source (if any).</p></li><li><p><code>requiresDeduping</code>: The runner uses this method to determine whether the data
requires explicit removal of duplicate records. If this method returns true,
the runner will automatically insert a step to remove duplicates from your
source&rsquo;s output. This should return true if and only if your source
provides record IDs for each record. See <code>UnboundedReader.getCurrentRecordId</code>
for when this should be done.</p></li><li><p><code>createReader</code>: Creates the associated <code>UnboundedReader</code> for this
<code>UnboundedSource</code>.</p></li></ul><h3 id=implementing-the-reader-subclass>Implementing the Reader subclass</h3><p>You must create a subclass of either <code>BoundedReader</code> or <code>UnboundedReader</code> to be
returned by your source subclass&rsquo;s <code>createReader</code> method. The runner uses the
methods in your <code>Reader</code> (whether bounded or unbounded) to do the actual reading
of your dataset.</p><p><code>BoundedReader</code> and <code>UnboundedReader</code> have similar basic interfaces, which
you&rsquo;ll need to define. In addition, there are some additional methods unique to
<code>UnboundedReader</code> that you&rsquo;ll need to implement for working with unbounded data,
and an optional method you can implement if you want your <code>BoundedReader</code> to
take advantage of dynamic work rebalancing. There are also minor differences in
the semantics for the <code>start()</code> and <code>advance()</code> methods when using
<code>UnboundedReader</code>.</p><h4 id=reader-methods-common-to-both-boundedreader-and-unboundedreader>Reader methods common to both BoundedReader and UnboundedReader</h4><p>A runner uses the following methods to read data using <code>BoundedReader</code> or
<code>UnboundedReader</code>:</p><ul><li><p><code>start</code>: Initializes the <code>Reader</code> and advances to the first record to be read.
This method is called exactly once when the runner begins reading your data,
and is a good place to put expensive operations needed for initialization.</p></li><li><p><code>advance</code>: Advances the reader to the next valid record. This method must
return false if there is no more input available. <code>BoundedReader</code> should stop
reading once advance returns false, but <code>UnboundedReader</code> can return true in
future calls once more data is available from your stream.</p></li><li><p><code>getCurrent</code>: Returns the data record at the current position, last read by
start or advance.</p></li><li><p><code>getCurrentTimestamp</code>: Returns the timestamp for the current data record. You
only need to override <code>getCurrentTimestamp</code> if your source reads data that has
intrinsic timestamps. The runner uses this value to set the intrinsic
timestamp for each element in the resulting output <code>PCollection</code>.</p></li></ul><h4 id=reader-methods-unique-to-unboundedreader>Reader methods unique to UnboundedReader</h4><p>In addition to the basic <code>Reader</code> interface, <code>UnboundedReader</code> has some
additional methods for managing reads from an unbounded data source:</p><ul><li><p><code>getCurrentRecordId</code>: Returns a unique identifier for the current record.
The runner uses these record IDs to filter out duplicate records. If your
data has logical IDs present in each record, you can have this method return
them; otherwise, you can return a hash of the record contents, using at
least a 128-bit hash. It is incorrect to use Java&rsquo;s <code>Object.hashCode()</code>, as
a 32-bit hash is generally insufficient for preventing collisions, and
<code>hasCode()</code> is not guaranteed to be stable across processes.</p><p>Implementing <code>getCurrentRecordId</code> is optional if your source uses a
checkpointing scheme that uniquely identifies each record. For example, if
your splits are files and the checkpoints are file positions up to which all
data has been read, you do not need record IDs. However, record IDs can
still be useful if upstream systems writing data to your source occasionally
produce duplicate records that your source might then read.</p></li><li><p><code>getWatermark</code>: Returns a watermark that your <code>Reader</code> provides. The watermark
is the approximate lower bound on timestamps of future elements to be read
by your <code>Reader</code>. The runner uses the watermark as an estimate of data
completeness. Watermarks are used in windowing and triggers.</p></li><li><p><code>getCheckpointMark</code>: The runner uses this method to create a checkpoint in
your data stream. The checkpoint represents the progress of the
<code>UnboundedReader</code>, which can be used for failure recovery. Different data
streams may use different checkpointing methods; some sources might require
received records to be acknowledged, while others might use positional
checkpointing. You&rsquo;ll need to tailor this method to the most appropriate
checkpointing scheme. For example, you might have this method return the
most recently acked record(s).</p></li><li><p><code>getCheckpointMark</code> is optional; you don&rsquo;t need to implement it if your data
does not have meaningful checkpoints. However, if you choose not to
implement checkpointing in your source, you may encounter duplicate data or
data loss in your pipeline, depending on whether your data source tries to
re-send records in case of errors.</p></li></ul><p>You can read a bounded <code>PCollection</code> from an <code>UnboundedSource</code> by specifying
either <code>.withMaxNumRecords</code> or <code>.withMaxReadTime</code> when you read from your
source. <code>.withMaxNumRecords</code> reads a fixed maximum number of records from your
unbounded source, while <code>.withMaxReadTime</code> reads from your unbounded source for
a fixed maximum time duration.</p><h4 id=bounded-dynamic>Using your BoundedSource with dynamic work rebalancing</h4><p>If your source provides bounded data, you can have your <code>BoundedReader</code> work
with dynamic work rebalancing by implementing the method <code>splitAtFraction</code>. The
runner may call <code>splitAtFraction</code> concurrently with start or advance on a given
reader so that the remaining data in your <code>Source</code> can be split and
redistributed to other workers.</p><p>When you implement <code>splitAtFraction</code>, your code must produce a
mutually-exclusive set of splits where the union of those splits matches the
total data set.</p><p>If you implement <code>splitAtFraction</code>, you must implement both <code>splitAtFraction</code>
and <code>getFractionConsumed</code> in a thread-safe manner, or data loss is possible. You
should also unit-test your implementation exhaustively to avoid data duplication
or data loss.</p><p>To ensure that your code is thread-safe, use the <code>RangeTracker</code> thread-safe
helper object to manage positions in your data source when implementing
<code>splitAtFraction</code> and <code>getFractionConsumed</code>.</p><p>We highly recommended that you unit test your implementations of
<code>splitAtFraction</code> using the <code>SourceTestUtils</code> class. <code>SourceTestUtils</code> contains
a number of methods for testing your implementation of <code>splitAtFraction</code>,
including exhaustive automatic testing.</p><h3 id=convenience-source-and-reader-base-classes>Convenience Source and Reader base classes</h3><p>The Beam SDK contains some convenient abstract base classes to help you create
<code>Source</code> and <code>Reader</code> classes that work with common data storage formats, like
files.</p><h4 id=filebasedsource>FileBasedSource</h4><p>If your data source uses files, you can derive your <code>Source</code> and <code>Reader</code>
classes from the <code>FileBasedSource</code> and <code>FileBasedReader</code> abstract base classes.
<code>FileBasedSource</code> is a bounded source subclass that implements code common to
Beam sources that interact with files, including:</p><ul><li>File pattern expansion</li><li>Sequential record reading</li><li>Split points</li></ul><h2 id=using-filebasedsink>Using the FileBasedSink abstraction</h2><p>If your data source uses files, you can implement the <code>FileBasedSink</code>
abstraction to create a file-based sink. For other sinks, use <code>ParDo</code>,
<code>GroupByKey</code>, and other transforms offered by the Beam SDK for Java. See the
<a href=/documentation/io/developing-io-overview/>developing I/O connectors overview</a>
for more details.</p><p>When using the <code>FileBasedSink</code> interface, you must provide the format-specific
logic that tells the runner how to write bounded data from your pipeline&rsquo;s
<code>PCollection</code>s to an output sink. The runner writes bundles of data in parallel
using multiple workers.</p><p>Supply the logic for your file-based sink by implementing the following classes:</p><ul><li><p>A subclass of the abstract base class <code>FileBasedSink</code>. <code>FileBasedSink</code>
describes a location or resource that your pipeline can write to in
parallel. To avoid exposing your sink to end-users, your <code>FileBasedSink</code>
subclass should be protected or private.</p></li><li><p>A user-facing wrapper <code>PTransform</code> that, as part of the logic, calls
<a href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java>WriteFiles</a>
and passes your <code>FileBasedSink</code> as a parameter. A user should not need to
call <code>WriteFiles</code> directly.</p></li></ul><p>The <code>FileBasedSink</code> abstract base class implements code that is common to Beam
sinks that interact with files, including:</p><ul><li>Setting file headers and footers</li><li>Sequential record writing</li><li>Setting the output MIME type</li></ul><p><code>FileBasedSink</code> and its subclasses support writing files to any Beam-supported
<code>FileSystem</code> implementations. See the following Beam-provided <code>FileBasedSink</code>
implementations for examples:</p><ul><li><a href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java>TextSink</a> and</li><li><a href=https://github.com/apache/beam/blob/master/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroSink.java>AvroSink</a>.</li></ul><h2 id=ptransform-wrappers>PTransform wrappers</h2><p>When you create a source or sink that end-users will use, avoid exposing your
source or sink code. To avoid exposing your sources and sinks to end-users, your
new classes should be protected or private. Then, implement a user-facing
wrapper <code>PTransform</code>. By exposing your source or sink as a transform, your
implementation is hidden and can be arbitrarily complex or simple. The greatest
benefit of not exposing implementation details is that later on, you can add
additional functionality without breaking the existing implementation for users.</p><p>For example, if your users’ pipelines read from your source using
<code>read</code> and you want to insert a reshard into the pipeline, all
users would need to add the reshard themselves (using the <code>GroupByKey</code>
transform). To solve this, we recommended that you expose the source as a
composite <code>PTransform</code> that performs both the read operation and the reshard.</p><p>See Beam’s <a href=/contribute/ptransform-style-guide/#exposing-a-ptransform-vs-something-else>PTransform style guide</a>
for additional information about wrapping with a <code>PTransform</code>.</p><div class=feedback><p class=update>Last updated on 2024/05/03</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>